import asyncio
import warnings
from dataclasses import dataclass
from datetime import datetime
from os import getenv
from typing import (
    Any,
    AsyncIterator,
    Awaitable,
    Callable,
    Dict,
    Iterator,
    List,
    Literal,
    Optional,
    Tuple,
    Type,
    Union,
    cast,
    overload,
)
from uuid import uuid4

from fastapi import WebSocket
from pydantic import BaseModel

from agno.agent.agent import Agent
from agno.db.base import AsyncBaseDb, BaseDb, SessionType
from agno.exceptions import InputCheckError, OutputCheckError, RunCancelledException
from agno.media import Audio, File, Image, Video
from agno.models.message import Message
from agno.models.metrics import Metrics
from agno.run import RunContext, RunStatus
from agno.run.agent import RunContentEvent, RunEvent, RunOutput
from agno.run.cancel import (
    cancel_run as cancel_run_global,
)
from agno.run.cancel import (
    cleanup_run,
    raise_if_cancelled,
    register_run,
)
from agno.run.team import RunContentEvent as TeamRunContentEvent
from agno.run.team import TeamRunEvent
from agno.run.workflow import (
    StepOutputEvent,
    WorkflowCancelledEvent,
    WorkflowCompletedEvent,
    WorkflowRunEvent,
    WorkflowRunOutput,
    WorkflowRunOutputEvent,
    WorkflowStartedEvent,
)
from agno.session.workflow import WorkflowChatInteraction, WorkflowSession
from agno.team.team import Team
from agno.utils.common import is_typed_dict, validate_typed_dict
from agno.utils.log import (
    log_debug,
    log_error,
    log_warning,
    logger,
    set_log_level_to_debug,
    set_log_level_to_info,
    use_workflow_logger,
)
from agno.utils.print_response.workflow import (
    aprint_response,
    aprint_response_stream,
    print_response,
    print_response_stream,
)
from agno.workflow import WorkflowAgent
from agno.workflow.condition import Condition
from agno.workflow.loop import Loop
from agno.workflow.parallel import Parallel
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.steps import Steps
from agno.workflow.types import (
    StepInput,
    StepMetrics,
    StepOutput,
    StepType,
    WebSocketHandler,
    WorkflowExecutionInput,
    WorkflowMetrics,
)

STEP_TYPE_MAPPING = {
    Step: StepType.STEP,
    Steps: StepType.STEPS,
    Loop: StepType.LOOP,
    Parallel: StepType.PARALLEL,
    Condition: StepType.CONDITION,
    Router: StepType.ROUTER,
}

WorkflowSteps = Union[
    Callable[
        ["Workflow", WorkflowExecutionInput],
        Union[StepOutput, Awaitable[StepOutput], Iterator[StepOutput], AsyncIterator[StepOutput], Any],
    ],
    Steps,
    List[
        Union[
            Callable[
                [StepInput], Union[StepOutput, Awaitable[StepOutput], Iterator[StepOutput], AsyncIterator[StepOutput]]
            ],
            Step,
            Steps,
            Loop,
            Parallel,
            Condition,
            Router,
        ]
    ],
]


@dataclass
class Workflow:
    """Pipeline-based workflow execution"""

    # Workflow identification - make name optional with default
    name: Optional[str] = None
    # Workflow ID (autogenerated if not set)
    id: Optional[str] = None
    # Workflow description
    description: Optional[str] = None

    # Workflow steps
    steps: Optional[WorkflowSteps] = None

    # Database to use for this workflow
    db: Optional[Union[BaseDb, AsyncBaseDb]] = None

    # Agentic Workflow - WorkflowAgent that decides when to run the workflow
    agent: Optional[WorkflowAgent] = None  # type: ignore

    # Default session_id to use for this workflow (autogenerated if not set)
    session_id: Optional[str] = None
    # Default user_id to use for this workflow
    user_id: Optional[str] = None
    # Default session state (stored in the database to persist across runs)
    session_state: Optional[Dict[str, Any]] = None
    # Set to True to overwrite the stored session_state with the session_state provided in the run
    overwrite_db_session_state: bool = False

    # If True, the workflow runs in debug mode
    debug_mode: Optional[bool] = False

    # --- Workflow Streaming ---
    # Stream the response from the Workflow
    stream: Optional[bool] = None
    # Stream the intermediate steps from the Workflow
    stream_events: bool = False
    # Stream events from executors (agents/teams/functions) within steps
    stream_executor_events: bool = True

    # Persist the events on the run response
    store_events: bool = False
    # Events to skip when persisting the events on the run response
    events_to_skip: Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] = None

    # Control whether to store executor responses (agent/team responses) in flattened runs
    store_executor_outputs: bool = True

    websocket_handler: Optional[WebSocketHandler] = None

    # Input schema to validate the input to the workflow
    input_schema: Optional[Type[BaseModel]] = None

    # Metadata stored with this workflow
    metadata: Optional[Dict[str, Any]] = None

    # --- Telemetry ---
    # telemetry=True logs minimal telemetry for analytics
    # This helps us improve the Agent and provide better support
    telemetry: bool = True

    # Add this flag to control if the workflow should add history to the steps
    add_workflow_history_to_steps: bool = False
    # Number of historical runs to include in the messages
    num_history_runs: int = 3

    # Deprecated. Use stream_events instead.
    stream_intermediate_steps: bool = False

    # If True, run hooks as FastAPI background tasks (non-blocking). Set by AgentOS.
    _run_hooks_in_background: bool = False

    def __init__(
        self,
        id: Optional[str] = None,
        name: Optional[str] = None,
        description: Optional[str] = None,
        db: Optional[Union[BaseDb, AsyncBaseDb]] = None,
        steps: Optional[WorkflowSteps] = None,
        agent: Optional[WorkflowAgent] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        overwrite_db_session_state: bool = False,
        user_id: Optional[str] = None,
        debug_level: Literal[1, 2] = 1,
        debug_mode: Optional[bool] = False,
        stream: Optional[bool] = None,
        stream_events: bool = False,
        stream_intermediate_steps: bool = False,
        stream_executor_events: bool = True,
        store_events: bool = False,
        events_to_skip: Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] = None,
        store_executor_outputs: bool = True,
        input_schema: Optional[Type[BaseModel]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        cache_session: bool = False,
        telemetry: bool = True,
        add_workflow_history_to_steps: bool = False,
        num_history_runs: int = 3,
    ):
        self.id = id
        self.name = name
        self.description = description
        self.steps = steps
        self.agent = agent
        self.session_id = session_id
        self.session_state = session_state
        self.overwrite_db_session_state = overwrite_db_session_state
        self.user_id = user_id
        self.debug_mode = debug_mode
        self.debug_level = debug_level
        self.store_events = store_events
        self.events_to_skip = events_to_skip or []
        self.stream = stream
        self.stream_executor_events = stream_executor_events
        self.store_executor_outputs = store_executor_outputs
        self.input_schema = input_schema
        self.metadata = metadata
        self.cache_session = cache_session
        self.db = db
        self.telemetry = telemetry
        self.add_workflow_history_to_steps = add_workflow_history_to_steps
        self.num_history_runs = num_history_runs
        self._workflow_session: Optional[WorkflowSession] = None

        if stream_intermediate_steps:
            warnings.warn(
                "The 'stream_intermediate_steps' parameter is deprecated and will be removed in future versions. Use 'stream_events' instead.",
                DeprecationWarning,
                stacklevel=2,
            )
        self.stream_events = stream_events or stream_intermediate_steps

        # Warn if workflow history is enabled without a database
        if self.add_workflow_history_to_steps and self.db is None:
            log_warning(
                "Workflow history is enabled (add_workflow_history_to_steps=True) but no database is configured. "
                "History won't be persisted. Add a database to persist runs across executions. "
            )

    def set_id(self) -> None:
        if self.id is None:
            if self.name is not None:
                self.id = self.name.lower().replace(" ", "-")
            else:
                self.id = str(uuid4())

    def _has_async_db(self) -> bool:
        return self.db is not None and isinstance(self.db, AsyncBaseDb)

    def _validate_input(
        self, input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]]
    ) -> Optional[Union[str, List, Dict, Message, BaseModel]]:
        """Parse and validate input against input_schema if provided"""
        if self.input_schema is None:
            return input  # Return input unchanged if no schema is set

        if input is None:
            raise ValueError("Input required when input_schema is set")

        # Handle Message objects - extract content
        if isinstance(input, Message):
            input = input.content  # type: ignore

        # If input is a string, convert it to a dict
        if isinstance(input, str):
            import json

            try:
                input = json.loads(input)
            except Exception as e:
                raise ValueError(f"Failed to parse input. Is it a valid JSON string?: {e}")

        # Case 1: Message is already a BaseModel instance
        if isinstance(input, BaseModel):
            if isinstance(input, self.input_schema):
                try:
                    return input
                except Exception as e:
                    raise ValueError(f"BaseModel validation failed: {str(e)}")
            else:
                # Different BaseModel types
                raise ValueError(f"Expected {self.input_schema.__name__} but got {type(input).__name__}")

        # Case 2: Message is a dict
        elif isinstance(input, dict):
            try:
                # Check if the schema is a TypedDict
                if is_typed_dict(self.input_schema):
                    validated_dict = validate_typed_dict(input, self.input_schema)
                    return validated_dict
                else:
                    validated_model = self.input_schema(**input)
                    return validated_model
            except Exception as e:
                raise ValueError(f"Failed to parse dict into {self.input_schema.__name__}: {str(e)}")

        # Case 3: Other types not supported for structured input
        else:
            raise ValueError(
                f"Cannot validate {type(input)} against input_schema. Expected dict or {self.input_schema.__name__} instance."
            )

    @property
    def run_parameters(self) -> Dict[str, Any]:
        """Get the run parameters for the workflow"""

        if self.steps is None:
            return {}

        parameters = {}

        if self.steps and callable(self.steps):
            from inspect import Parameter, signature

            sig = signature(self.steps)  # type: ignore

            for param_name, param in sig.parameters.items():
                if param_name not in ["workflow", "execution_input", "self"]:
                    parameters[param_name] = {
                        "name": param_name,
                        "default": param.default.default
                        if hasattr(param.default, "__class__") and param.default.__class__.__name__ == "FieldInfo"
                        else (param.default if param.default is not Parameter.empty else None),
                        "annotation": (
                            param.annotation.__name__
                            if hasattr(param.annotation, "__name__")
                            else (
                                str(param.annotation).replace("typing.Optional[", "").replace("]", "")
                                if "typing.Optional" in str(param.annotation)
                                else str(param.annotation)
                            )
                        )
                        if param.annotation is not Parameter.empty
                        else None,
                        "required": param.default is Parameter.empty,
                    }
        else:
            parameters = {
                "message": {
                    "name": "message",
                    "default": None,
                    "annotation": "str",
                    "required": True,
                },
            }

        return parameters

    def initialize_workflow(self):
        if self.id is None:
            self.set_id()
            log_debug(f"Generated new workflow_id: {self.id}")

    def _initialize_session(
        self,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
    ) -> Tuple[str, Optional[str]]:
        """Initialize the session for the workflow."""

        if session_id is None:
            if self.session_id:
                session_id = self.session_id
            else:
                session_id = str(uuid4())
                # We make the session_id sticky to the agent instance if no session_id is provided
                self.session_id = session_id

        log_debug(f"Session ID: {session_id}", center=True)

        # Use the default user_id when necessary
        if user_id is None or user_id == "":
            user_id = self.user_id

        return session_id, user_id

    def _initialize_session_state(
        self,
        session_state: Dict[str, Any],
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        run_id: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Initialize the session state for the workflow."""
        if user_id:
            session_state["current_user_id"] = user_id
        if session_id is not None:
            session_state["current_session_id"] = session_id
        if run_id is not None:
            session_state["current_run_id"] = run_id

        session_state.update(
            {
                "workflow_id": self.id,
                "run_id": run_id,
                "session_id": session_id,
            }
        )
        if self.name:
            session_state["workflow_name"] = self.name

        return session_state

    def _generate_workflow_session_name(self) -> str:
        """Generate a name for the workflow session"""

        if self.session_id is None:
            return f"Workflow Session - {datetime.now().strftime('%Y-%m-%d %H:%M')}"

        datetime_str = datetime.now().strftime("%Y-%m-%d %H:%M")
        new_session_name = f"Workflow Session-{datetime_str}"

        if self.description:
            truncated_desc = self.description[:40] + "-" if len(self.description) > 40 else self.description
            new_session_name = f"{truncated_desc} - {datetime_str}"
        return new_session_name

    async def aset_session_name(
        self, session_id: Optional[str] = None, autogenerate: bool = False, session_name: Optional[str] = None
    ) -> WorkflowSession:
        """Set the session name and save to storage, using an async database"""
        session_id = session_id or self.session_id

        if session_id is None:
            raise Exception("Session ID is not set")

        # -*- Read from storage
        session = await self.aget_session(session_id=session_id)  # type: ignore

        if autogenerate:
            # -*- Generate name for session
            session_name = self._generate_workflow_session_name()
            log_debug(f"Generated Workflow Session Name: {session_name}")
        elif session_name is None:
            raise Exception("Session name is not set")

        # -*- Rename session
        session.session_data["session_name"] = session_name  # type: ignore

        # -*- Save to storage
        await self.asave_session(session=session)  # type: ignore

        return session  # type: ignore

    def set_session_name(
        self, session_id: Optional[str] = None, autogenerate: bool = False, session_name: Optional[str] = None
    ) -> WorkflowSession:
        """Set the session name and save to storage"""
        session_id = session_id or self.session_id

        if session_id is None:
            raise Exception("Session ID is not set")

        # -*- Read from storage
        session = self.get_session(session_id=session_id)  # type: ignore

        if autogenerate:
            # -*- Generate name for session
            session_name = self._generate_workflow_session_name()
            log_debug(f"Generated Workflow Session Name: {session_name}")
        elif session_name is None:
            raise Exception("Session name is not set")

        # -*- Rename session
        session.session_data["session_name"] = session_name  # type: ignore

        # -*- Save to storage
        self.save_session(session=session)  # type: ignore

        return session  # type: ignore

    async def aget_session_name(self, session_id: Optional[str] = None) -> str:
        """Get the session name for the given session ID and user ID."""
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is not set")
        session = await self.aget_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")
        return session.session_data.get("session_name", "") if session.session_data else ""

    def get_session_name(self, session_id: Optional[str] = None) -> str:
        """Get the session name for the given session ID and user ID."""
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is not set")
        session = self.get_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")
        return session.session_data.get("session_name", "") if session.session_data else ""

    async def aget_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]:
        """Get the session state for the given session ID and user ID."""
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is not set")
        session = await self.aget_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")
        return session.session_data.get("session_state", {}) if session.session_data else {}

    def get_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]:
        """Get the session state for the given session ID and user ID."""
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is not set")
        session = self.get_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")
        return session.session_data.get("session_state", {}) if session.session_data else {}

    def update_session_state(
        self, session_state_updates: Dict[str, Any], session_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Update the session state for the given session ID.
        Args:
            session_state_updates: The updates to apply to the session state. Should be a dictionary of key-value pairs.
            session_id: The session ID to update. If not provided, the current cached session ID is used.
        Returns:
            dict: The updated session state.
        """
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is not set")
        session = self.get_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")

        if session.session_data is not None and "session_state" not in session.session_data:
            session.session_data["session_state"] = {}

        for key, value in session_state_updates.items():
            session.session_data["session_state"][key] = value  # type: ignore

        self.save_session(session=session)

        return session.session_data["session_state"]  # type: ignore

    async def aupdate_session_state(
        self, session_state_updates: Dict[str, Any], session_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Update the session state for the given session ID (async).
        Args:
            session_state_updates: The updates to apply to the session state. Should be a dictionary of key-value pairs.
            session_id: The session ID to update. If not provided, the current cached session ID is used.
        Returns:
            dict: The updated session state.
        """
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is not set")
        session = await self.aget_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")

        if session.session_data is not None and "session_state" not in session.session_data:
            session.session_data["session_state"] = {}  # type: ignore

        for key, value in session_state_updates.items():
            session.session_data["session_state"][key] = value  # type: ignore

        await self.asave_session(session=session)

        return session.session_data["session_state"]  # type: ignore

    async def adelete_session(self, session_id: str):
        """Delete the current session and save to storage"""
        if self.db is None:
            return
        # -*- Delete session
        await self.db.delete_session(session_id=session_id)  # type: ignore

    def delete_session(self, session_id: str):
        """Delete the current session and save to storage"""
        if self.db is None:
            return
        # -*- Delete session
        self.db.delete_session(session_id=session_id)

    async def aget_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
        """Get a RunOutput from the database."""
        if self._workflow_session is not None:
            run_response = self._workflow_session.get_run(run_id=run_id)
            if run_response is not None:
                return run_response
            else:
                log_warning(f"RunOutput {run_id} not found in AgentSession {self._workflow_session.session_id}")
                return None
        else:
            workflow_session = await self.aget_session(session_id=session_id)  # type: ignore
            if workflow_session is not None:
                run_response = workflow_session.get_run(run_id=run_id)
                if run_response is not None:
                    return run_response
                else:
                    log_warning(f"RunOutput {run_id} not found in AgentSession {session_id}")
        return None

    def get_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
        """Get a RunOutput from the database."""
        if self._workflow_session is not None:
            run_response = self._workflow_session.get_run(run_id=run_id)
            if run_response is not None:
                return run_response
            else:
                log_warning(f"RunOutput {run_id} not found in AgentSession {self._workflow_session.session_id}")
                return None
        else:
            workflow_session = self.get_session(session_id=session_id)
            if workflow_session is not None:
                run_response = workflow_session.get_run(run_id=run_id)
                if run_response is not None:
                    return run_response
                else:
                    log_warning(f"RunOutput {run_id} not found in AgentSession {session_id}")
        return None

    async def aget_last_run_output(self, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
        """Get the last run response from the database."""
        if (
            self._workflow_session is not None
            and self._workflow_session.runs is not None
            and len(self._workflow_session.runs) > 0
        ):
            run_response = self._workflow_session.runs[-1]
            if run_response is not None:
                return run_response
        else:
            workflow_session = await self.aget_session(session_id=session_id)  # type: ignore
            if workflow_session is not None and workflow_session.runs is not None and len(workflow_session.runs) > 0:
                run_response = workflow_session.runs[-1]
                if run_response is not None:
                    return run_response
            else:
                log_warning(f"No run responses found in WorkflowSession {session_id}")
                return None

    def get_last_run_output(self, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
        """Get the last run response from the database."""
        if (
            self._workflow_session is not None
            and self._workflow_session.runs is not None
            and len(self._workflow_session.runs) > 0
        ):
            run_response = self._workflow_session.runs[-1]
            if run_response is not None:
                return run_response
        else:
            workflow_session = self.get_session(session_id=session_id)
            if workflow_session is not None and workflow_session.runs is not None and len(workflow_session.runs) > 0:
                run_response = workflow_session.runs[-1]
                if run_response is not None:
                    return run_response
            else:
                log_warning(f"No run responses found in WorkflowSession {session_id}")
                return None

    def read_or_create_session(
        self,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> WorkflowSession:
        from time import time

        # Returning cached session if we have one
        if self._workflow_session is not None and self._workflow_session.session_id == session_id:
            return self._workflow_session

        # Try to load from database
        workflow_session = None
        if self.db is not None:
            log_debug(f"Reading WorkflowSession: {session_id}")

            workflow_session = cast(WorkflowSession, self._read_session(session_id=session_id))

        if workflow_session is None:
            # Creating new session if none found
            log_debug(f"Creating new WorkflowSession: {session_id}")
            session_data = {}
            if self.session_state is not None:
                from copy import deepcopy

                session_data["session_state"] = deepcopy(self.session_state)
            workflow_session = WorkflowSession(
                session_id=session_id,
                workflow_id=self.id,
                user_id=user_id,
                workflow_data=self._get_workflow_data(),
                session_data=session_data,
                metadata=self.metadata,
                created_at=int(time()),
            )

        # Cache the session if relevant
        if workflow_session is not None and self.cache_session:
            self._workflow_session = workflow_session

        return workflow_session

    async def aread_or_create_session(
        self,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> WorkflowSession:
        from time import time

        # Returning cached session if we have one
        if self._workflow_session is not None and self._workflow_session.session_id == session_id:
            return self._workflow_session

        # Try to load from database
        workflow_session = None
        if self.db is not None:
            log_debug(f"Reading WorkflowSession: {session_id}")

            workflow_session = cast(WorkflowSession, await self._aread_session(session_id=session_id))

        if workflow_session is None:
            # Creating new session if none found
            log_debug(f"Creating new WorkflowSession: {session_id}")
            workflow_session = WorkflowSession(
                session_id=session_id,
                workflow_id=self.id,
                user_id=user_id,
                workflow_data=self._get_workflow_data(),
                session_data={},
                metadata=self.metadata,
                created_at=int(time()),
            )

        # Cache the session if relevant
        if workflow_session is not None and self.cache_session:
            self._workflow_session = workflow_session

        return workflow_session

    async def aget_session(
        self,
        session_id: Optional[str] = None,
    ) -> Optional[WorkflowSession]:
        """Load an WorkflowSession from database.

        Args:
            session_id: The session_id to load from storage.

        Returns:
            WorkflowSession: The WorkflowSession loaded from the database or created if it does not exist.
        """
        session_id_to_load = session_id or self.session_id
        if session_id_to_load is None:
            raise Exception("No session_id provided")

        # Try to load from database
        if self.db is not None:
            workflow_session = cast(WorkflowSession, await self._aread_session(session_id=session_id_to_load))
            return workflow_session

        log_warning(f"WorkflowSession {session_id_to_load} not found in db")
        return None

    def get_session(
        self,
        session_id: Optional[str] = None,
    ) -> Optional[WorkflowSession]:
        """Load an WorkflowSession from database.

        Args:
            session_id: The session_id to load from storage.

        Returns:
            WorkflowSession: The WorkflowSession loaded from the database or created if it does not exist.
        """
        if not session_id and not self.session_id:
            raise Exception("No session_id provided")

        session_id_to_load = session_id or self.session_id

        # Try to load from database
        if self.db is not None and session_id_to_load is not None:
            workflow_session = cast(WorkflowSession, self._read_session(session_id=session_id_to_load))
            return workflow_session

        log_warning(f"WorkflowSession {session_id_to_load} not found in db")
        return None

    async def asave_session(self, session: WorkflowSession) -> None:
        """Save the WorkflowSession to storage, using an async database.

        Returns:
            Optional[WorkflowSession]: The saved WorkflowSession or None if not saved.
        """
        if self.db is not None and session.session_data is not None:
            if session.session_data.get("session_state") is not None:
                session.session_data["session_state"].pop("current_session_id", None)
                session.session_data["session_state"].pop("current_user_id", None)
                session.session_data["session_state"].pop("current_run_id", None)
                session.session_data["session_state"].pop("workflow_id", None)
                session.session_data["session_state"].pop("run_id", None)
                session.session_data["session_state"].pop("session_id", None)
                session.session_data["session_state"].pop("workflow_name", None)

            await self._aupsert_session(session=session)  # type: ignore
            log_debug(f"Created or updated WorkflowSession record: {session.session_id}")

    def save_session(self, session: WorkflowSession) -> None:
        """Save the WorkflowSession to storage

        Returns:
            Optional[WorkflowSession]: The saved WorkflowSession or None if not saved.
        """
        if self.db is not None and session.session_data is not None:
            if session.session_data.get("session_state") is not None:
                session.session_data["session_state"].pop("current_session_id", None)
                session.session_data["session_state"].pop("current_user_id", None)
                session.session_data["session_state"].pop("current_run_id", None)
                session.session_data["session_state"].pop("workflow_id", None)
                session.session_data["session_state"].pop("run_id", None)
                session.session_data["session_state"].pop("session_id", None)
                session.session_data["session_state"].pop("workflow_name", None)

            self._upsert_session(session=session)
            log_debug(f"Created or updated WorkflowSession record: {session.session_id}")

    def get_chat_history(
        self, session_id: Optional[str] = None, last_n_runs: Optional[int] = None
    ) -> List[WorkflowChatInteraction]:
        """Return a list of dictionaries containing the input and output for each run in the session.

        Args:
            session_id: The session ID to get the chat history for. If not provided, the current cached session ID is used.
            last_n_runs: Number of recent runs to include. If None, all runs will be considered.

        Returns:
            A list of WorkflowChatInteraction objects.
        """
        session_id = session_id or self.session_id
        if session_id is None:
            log_warning("Session ID is not set, cannot get messages for session")
            return []

        session = self.get_session(
            session_id=session_id,
        )
        if session is None:
            raise Exception("Session not found")

        return session.get_chat_history(last_n_runs=last_n_runs)

    async def aget_chat_history(
        self, session_id: Optional[str] = None, last_n_runs: Optional[int] = None
    ) -> List[WorkflowChatInteraction]:
        """Return a list of dictionaries containing the input and output for each run in the session.

        Args:
            session_id: The session ID to get the chat history for. If not provided, the current cached session ID is used.
            last_n_runs: Number of recent runs to include. If None, all runs will be considered.

        Returns:
            A list of dictionaries containing the input and output for each run.
        """
        session_id = session_id or self.session_id
        if session_id is None:
            log_warning("Session ID is not set, cannot get messages for session")
            return []

        session = await self.aget_session(session_id=session_id)
        if session is None:
            raise Exception("Session not found")

        return session.get_chat_history(last_n_runs=last_n_runs)

    # -*- Session Database Functions
    async def _aread_session(self, session_id: str) -> Optional[WorkflowSession]:
        """Get a Session from the database."""
        try:
            if not self.db:
                raise ValueError("Db not initialized")
            session = await self.db.get_session(session_id=session_id, session_type=SessionType.WORKFLOW)  # type: ignore
            return session if isinstance(session, (WorkflowSession, type(None))) else None
        except Exception as e:
            log_warning(f"Error getting session from db: {e}")
            return None

    def _read_session(self, session_id: str) -> Optional[WorkflowSession]:
        """Get a Session from the database."""
        try:
            if not self.db:
                raise ValueError("Db not initialized")
            session = self.db.get_session(session_id=session_id, session_type=SessionType.WORKFLOW)
            return session if isinstance(session, (WorkflowSession, type(None))) else None
        except Exception as e:
            log_warning(f"Error getting session from db: {e}")
            return None

    async def _aupsert_session(self, session: WorkflowSession) -> Optional[WorkflowSession]:
        """Upsert a Session into the database."""
        try:
            if not self.db:
                raise ValueError("Db not initialized")
            result = await self.db.upsert_session(session=session)  # type: ignore
            return result if isinstance(result, (WorkflowSession, type(None))) else None
        except Exception as e:
            log_warning(f"Error upserting session into db: {e}")
            return None

    def _upsert_session(self, session: WorkflowSession) -> Optional[WorkflowSession]:
        """Upsert a Session into the database."""
        try:
            if not self.db:
                raise ValueError("Db not initialized")
            result = self.db.upsert_session(session=session)
            return result if isinstance(result, (WorkflowSession, type(None))) else None
        except Exception as e:
            log_warning(f"Error upserting session into db: {e}")
            return None

    def _update_metadata(self, session: WorkflowSession):
        """Update the extra_data in the session"""
        from agno.utils.merge_dict import merge_dictionaries

        # Read metadata from the database
        if session.metadata is not None:
            # If metadata is set in the workflow, update the database metadata with the workflow's metadata
            if self.metadata is not None:
                # Updates workflow's session metadata in place
                merge_dictionaries(session.metadata, self.metadata)
            # Update the current metadata with the metadata from the database which is updated in place
            self.metadata = session.metadata

    def _load_session_state(self, session: WorkflowSession, session_state: Dict[str, Any]):
        """Load and return the stored session_state from the database, optionally merging it with the given one"""

        from agno.utils.merge_dict import merge_dictionaries

        # Get the session_state from the database and merge with proper precedence
        # At this point session_state contains: agent_defaults + run_params
        if session.session_data and "session_state" in session.session_data:
            session_state_from_db = session.session_data.get("session_state")

            if (
                session_state_from_db is not None
                and isinstance(session_state_from_db, dict)
                and len(session_state_from_db) > 0
                and not self.overwrite_db_session_state
            ):
                # This preserves precedence: run_params > db_state > agent_defaults
                merged_state = session_state_from_db.copy()
                merge_dictionaries(merged_state, session_state)
                session_state.clear()
                session_state.update(merged_state)

        # Update the session_state in the session
        if session.session_data is None:
            session.session_data = {}
        session.session_data["session_state"] = session_state

        return session_state

    def _get_workflow_data(self) -> Dict[str, Any]:
        workflow_data: Dict[str, Any] = {
            "workflow_id": self.id,
            "name": self.name,
        }

        if self.steps and not callable(self.steps):
            steps_dict = []
            for step in self.steps:  # type: ignore
                if callable(step):
                    step_type = StepType.STEP
                elif isinstance(step, Agent) or isinstance(step, Team):
                    step_type = StepType.STEP
                else:
                    step_type = STEP_TYPE_MAPPING[type(step)]
                step_dict = {
                    "name": step.name if hasattr(step, "name") else step.__name__,  # type: ignore
                    "description": step.description if hasattr(step, "description") else "User-defined callable step",
                    "type": step_type.value,
                }
                steps_dict.append(step_dict)

            workflow_data["steps"] = steps_dict

        elif callable(self.steps):
            workflow_data["steps"] = [
                {
                    "name": "Custom Function",
                    "description": "User-defined callable workflow",
                    "type": "Callable",
                }
            ]

        return workflow_data

    def _broadcast_to_websocket(
        self,
        event: Any,
        websocket_handler: Optional[WebSocketHandler] = None,
    ) -> None:
        """Broadcast events to WebSocket if available (async context only)"""
        if websocket_handler:
            try:
                loop = asyncio.get_running_loop()
                if loop:
                    asyncio.create_task(websocket_handler.handle_event(event))
            except RuntimeError:
                pass

    def _handle_event(
        self,
        event: "WorkflowRunOutputEvent",
        workflow_run_response: WorkflowRunOutput,
        websocket_handler: Optional[WebSocketHandler] = None,
    ) -> "WorkflowRunOutputEvent":
        """Handle workflow events for storage - similar to Team._handle_event"""
        from agno.run.agent import RunOutput
        from agno.run.base import BaseRunOutputEvent
        from agno.run.team import TeamRunOutput

        if isinstance(event, (RunOutput, TeamRunOutput)):
            return event
        if self.store_events:
            # Check if this event type should be skipped
            if self.events_to_skip:
                event_type = event.event
                for skip_event in self.events_to_skip:
                    if isinstance(skip_event, str):
                        if event_type == skip_event:
                            return event
                    else:
                        # It's a WorkflowRunEvent enum
                        if event_type == skip_event.value:
                            return event

            # Store the event
            if isinstance(event, BaseRunOutputEvent):
                if workflow_run_response.events is None:
                    workflow_run_response.events = []
                workflow_run_response.events.append(event)

        # Broadcast to WebSocket if available (async context only)
        self._broadcast_to_websocket(event, websocket_handler)

        return event

    def _enrich_event_with_workflow_context(
        self,
        event: Any,
        workflow_run_response: WorkflowRunOutput,
        step_index: Optional[Union[int, tuple]] = None,
        step: Optional[Any] = None,
    ) -> Any:
        """Enrich any event with workflow context information for frontend tracking"""

        step_id = getattr(step, "step_id", None) if step else None
        step_name = getattr(step, "name", None) if step else None

        if hasattr(event, "workflow_id"):
            event.workflow_id = workflow_run_response.workflow_id
        if hasattr(event, "workflow_run_id"):
            event.workflow_run_id = workflow_run_response.run_id
        if hasattr(event, "step_id") and step_id:
            event.step_id = step_id
        if hasattr(event, "step_name") and step_name is not None:
            if event.step_name is None:
                event.step_name = step_name
        # Only set step_index if it's not already set (preserve parallel.py's tuples)
        if hasattr(event, "step_index") and step_index is not None:
            if event.step_index is None:
                event.step_index = step_index

        return event

    def _transform_step_output_to_event(
        self, step_output: StepOutput, workflow_run_response: WorkflowRunOutput, step_index: Optional[int] = None
    ) -> StepOutputEvent:
        """Transform a StepOutput object into a StepOutputEvent for consistent streaming interface"""
        return StepOutputEvent(
            step_output=step_output,
            run_id=workflow_run_response.run_id or "",
            workflow_name=workflow_run_response.workflow_name,
            workflow_id=workflow_run_response.workflow_id,
            session_id=workflow_run_response.session_id,
            step_name=step_output.step_name,
            step_index=step_index,
        )

    def _set_debug(self) -> None:
        """Set debug mode and configure logging"""
        if self.debug_mode or getenv("AGNO_DEBUG", "false").lower() == "true":
            use_workflow_logger()
            debug_level: Literal[1, 2] = (
                cast(Literal[1, 2], int(env)) if (env := getenv("AGNO_DEBUG_LEVEL")) in ("1", "2") else self.debug_level
            )

            self.debug_mode = True
            set_log_level_to_debug(source_type="workflow", level=debug_level)

            # Propagate to steps - only if steps is iterable (not callable)
            if self.steps and not callable(self.steps):
                if isinstance(self.steps, Steps):
                    steps_to_iterate = self.steps.steps
                else:
                    steps_to_iterate = self.steps

                for step in steps_to_iterate:
                    self._propagate_debug_to_step(step)
        else:
            set_log_level_to_info(source_type="workflow")

    def _set_telemetry(self) -> None:
        """Override telemetry settings based on environment variables."""

        telemetry_env = getenv("AGNO_TELEMETRY")
        if telemetry_env is not None:
            self.telemetry = telemetry_env.lower() == "true"

    def _propagate_debug_to_step(self, step):
        """Recursively propagate debug mode to steps and nested primitives"""
        # Handle direct Step objects
        if hasattr(step, "active_executor") and step.active_executor:
            executor = step.active_executor
            if hasattr(executor, "debug_mode"):
                executor.debug_mode = True

            # If it's a team, propagate to all members
            if hasattr(executor, "members"):
                for member in executor.members:
                    if hasattr(member, "debug_mode"):
                        member.debug_mode = True

        # Handle nested primitives - check both 'steps' and 'choices' attributes
        for attr_name in ["steps", "choices"]:
            if hasattr(step, attr_name):
                attr_value = getattr(step, attr_name)
                if attr_value and isinstance(attr_value, list):
                    for nested_step in attr_value:
                        self._propagate_debug_to_step(nested_step)

    def _create_step_input(
        self,
        execution_input: WorkflowExecutionInput,
        previous_step_outputs: Optional[Dict[str, StepOutput]] = None,
        shared_images: Optional[List[Image]] = None,
        shared_videos: Optional[List[Video]] = None,
        shared_audio: Optional[List[Audio]] = None,
        shared_files: Optional[List[File]] = None,
    ) -> StepInput:
        """Helper method to create StepInput with enhanced data flow support"""

        previous_step_content = None
        if previous_step_outputs:
            last_output = list(previous_step_outputs.values())[-1]
            previous_step_content = last_output.content if last_output else None
            log_debug(f"Using previous step content from: {list(previous_step_outputs.keys())[-1]}")

        return StepInput(
            input=execution_input.input,
            previous_step_content=previous_step_content,
            previous_step_outputs=previous_step_outputs,
            additional_data=execution_input.additional_data,
            images=shared_images or [],
            videos=shared_videos or [],
            audio=shared_audio or [],
            files=shared_files or [],
        )

    def _get_step_count(self) -> int:
        """Get the number of steps in the workflow"""
        if self.steps is None:
            return 0
        elif callable(self.steps):
            return 1  # Callable function counts as 1 step
        else:
            # Handle Steps wrapper
            if isinstance(self.steps, Steps):
                return len(self.steps.steps)
            else:
                return len(self.steps)

    def _aggregate_workflow_metrics(
        self,
        step_results: List[Union[StepOutput, List[StepOutput]]],
        current_workflow_metrics: Optional[WorkflowMetrics] = None,
    ) -> WorkflowMetrics:
        """Aggregate metrics from all step responses into structured workflow metrics"""
        steps_dict = {}

        def process_step_output(step_output: StepOutput):
            """Process a single step output for metrics"""

            # If this step has nested steps, process them recursively
            if hasattr(step_output, "steps") and step_output.steps:
                for nested_step in step_output.steps:
                    process_step_output(nested_step)

            # Only collect metrics from steps that actually have metrics (actual agents/teams)
            if (
                step_output.step_name and step_output.metrics and step_output.executor_type in ["agent", "team"]
            ):  # Only include actual executors
                step_metrics = StepMetrics(
                    step_name=step_output.step_name,
                    executor_type=step_output.executor_type or "unknown",
                    executor_name=step_output.executor_name or "unknown",
                    metrics=step_output.metrics,
                )
                steps_dict[step_output.step_name] = step_metrics

        # Process all step results
        for step_result in step_results:
            process_step_output(cast(StepOutput, step_result))

        duration = None
        if current_workflow_metrics and current_workflow_metrics.duration is not None:
            duration = current_workflow_metrics.duration

        return WorkflowMetrics(
            steps=steps_dict,
            duration=duration,
        )

    def _call_custom_function(self, func: Callable, execution_input: WorkflowExecutionInput, **kwargs: Any) -> Any:
        """Call custom function with only the parameters it expects"""
        from inspect import signature

        sig = signature(func)

        # Build arguments based on what the function actually accepts
        call_kwargs: Dict[str, Any] = {}

        # Only add workflow and execution_input if the function expects them
        if "workflow" in sig.parameters:  # type: ignore
            call_kwargs["workflow"] = self
        if "execution_input" in sig.parameters:
            call_kwargs["execution_input"] = execution_input  # type: ignore
        if "session_state" in sig.parameters:
            call_kwargs["session_state"] = self.session_state  # type: ignore

        # Add any other kwargs that the function expects
        for param_name in kwargs:
            if param_name in sig.parameters:  # type: ignore
                call_kwargs[param_name] = kwargs[param_name]

        # If function has **kwargs parameter, pass all remaining kwargs
        for param in sig.parameters.values():  # type: ignore
            if param.kind == param.VAR_KEYWORD:
                call_kwargs.update(kwargs)
                break

        try:
            return func(**call_kwargs)
        except TypeError as e:
            # If signature inspection fails, fall back to original method
            logger.error(f"Function signature inspection failed: {e}. Falling back to original calling convention.")
            return func(**kwargs)

    def _accumulate_partial_step_data(
        self, event: Union[RunContentEvent, TeamRunContentEvent], partial_step_content: str
    ) -> str:
        """Accumulate partial step data from streaming events"""
        if isinstance(event, (RunContentEvent, TeamRunContentEvent)) and event.content:
            if isinstance(event.content, str):
                partial_step_content += event.content
        return partial_step_content

    def _execute(
        self,
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        workflow_run_response: WorkflowRunOutput,
        run_context: RunContext,
        background_tasks: Optional[Any] = None,
        **kwargs: Any,
    ) -> WorkflowRunOutput:
        """Execute a specific pipeline by name synchronously"""
        from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction

        workflow_run_response.status = RunStatus.running

        if callable(self.steps):
            if iscoroutinefunction(self.steps) or isasyncgenfunction(self.steps):
                raise ValueError("Cannot use async function with synchronous execution")
            elif isgeneratorfunction(self.steps):
                content = ""
                for chunk in self.steps(self, execution_input, **kwargs):
                    # Check for cancellation while consuming generator
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
                        content += chunk.content
                    else:
                        content += str(chunk)
                workflow_run_response.content = content
            else:
                # Execute the workflow with the custom executor
                raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs)  # type: ignore[arg-type]

            workflow_run_response.status = RunStatus.completed
        else:
            try:
                # Track outputs from each step for enhanced data flow
                collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
                previous_step_outputs: Dict[str, StepOutput] = {}

                shared_images: List[Image] = execution_input.images or []
                output_images: List[Image] = (execution_input.images or []).copy()  # Start with input images
                shared_videos: List[Video] = execution_input.videos or []
                output_videos: List[Video] = (execution_input.videos or []).copy()  # Start with input videos
                shared_audio: List[Audio] = execution_input.audio or []
                output_audio: List[Audio] = (execution_input.audio or []).copy()  # Start with input audio
                shared_files: List[File] = execution_input.files or []
                output_files: List[File] = (execution_input.files or []).copy()  # Start with input files

                for i, step in enumerate(self.steps):  # type: ignore[arg-type]
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    step_name = getattr(step, "name", f"step_{i + 1}")
                    log_debug(f"Executing step {i + 1}/{self._get_step_count()}: {step_name}")

                    # Create enhanced StepInput
                    step_input = self._create_step_input(
                        execution_input=execution_input,
                        previous_step_outputs=previous_step_outputs,
                        shared_images=shared_images,
                        shared_videos=shared_videos,
                        shared_audio=shared_audio,
                        shared_files=shared_files,
                    )

                    # Check for can cellation before executing step
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore

                    step_output = step.execute(  # type: ignore[union-attr]
                        step_input,
                        session_id=session.session_id,
                        user_id=self.user_id,
                        workflow_run_response=workflow_run_response,
                        run_context=run_context,
                        store_executor_outputs=self.store_executor_outputs,
                        workflow_session=session,
                        add_workflow_history_to_steps=self.add_workflow_history_to_steps
                        if self.add_workflow_history_to_steps
                        else None,
                        num_history_runs=self.num_history_runs,
                        background_tasks=background_tasks,
                    )

                    # Check for cancellation after step execution
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore

                    # Update the workflow-level previous_step_outputs dictionary
                    previous_step_outputs[step_name] = step_output
                    collected_step_outputs.append(step_output)

                    # Update shared media for next step
                    shared_images.extend(step_output.images or [])
                    shared_videos.extend(step_output.videos or [])
                    shared_audio.extend(step_output.audio or [])
                    shared_files.extend(step_output.files or [])
                    output_images.extend(step_output.images or [])
                    output_videos.extend(step_output.videos or [])
                    output_audio.extend(step_output.audio or [])
                    output_files.extend(step_output.files or [])

                    if step_output.stop:
                        logger.info(f"Early termination requested by step {step_name}")
                        break

                # Update the workflow_run_response with completion data
                if collected_step_outputs:
                    # Stop the timer for the Run duration
                    if workflow_run_response.metrics:
                        workflow_run_response.metrics.stop_timer()

                    workflow_run_response.metrics = self._aggregate_workflow_metrics(
                        collected_step_outputs,
                        workflow_run_response.metrics,  # type: ignore[arg-type]
                    )
                    last_output = cast(StepOutput, collected_step_outputs[-1])

                    # Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
                    if getattr(last_output, "steps", None):
                        _cur = last_output
                        while getattr(_cur, "steps", None):
                            _steps = _cur.steps or []
                            if not _steps:
                                break
                            _cur = _steps[-1]
                        workflow_run_response.content = _cur.content
                    else:
                        workflow_run_response.content = last_output.content
                else:
                    workflow_run_response.content = "No steps executed"

                workflow_run_response.step_results = collected_step_outputs
                workflow_run_response.images = output_images
                workflow_run_response.videos = output_videos
                workflow_run_response.audio = output_audio
                workflow_run_response.status = RunStatus.completed

            except (InputCheckError, OutputCheckError) as e:
                log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")
                # Store error response
                workflow_run_response.status = RunStatus.error
                workflow_run_response.content = f"Validation failed: {str(e)} | Check: {e.check_trigger}"

                raise e
            except RunCancelledException as e:
                logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled")
                workflow_run_response.status = RunStatus.cancelled
                workflow_run_response.content = str(e)
            except Exception as e:
                import traceback

                traceback.print_exc()
                logger.error(f"Workflow execution failed: {e}")
                # Store error response
                workflow_run_response.status = RunStatus.error
                workflow_run_response.content = f"Workflow execution failed: {e}"
                raise e

            finally:
                # Stop timer on error
                if workflow_run_response.metrics:
                    workflow_run_response.metrics.stop_timer()

                self._update_session_metrics(session=session, workflow_run_response=workflow_run_response)
                session.upsert_run(run=workflow_run_response)
                self.save_session(session=session)
                # Always clean up the run tracking
                cleanup_run(workflow_run_response.run_id)  # type: ignore

        # Log Workflow Telemetry
        if self.telemetry:
            self._log_workflow_telemetry(session_id=session.session_id, run_id=workflow_run_response.run_id)

        return workflow_run_response

    def _execute_stream(
        self,
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        workflow_run_response: WorkflowRunOutput,
        run_context: RunContext,
        stream_events: bool = False,
        background_tasks: Optional[Any] = None,
        **kwargs: Any,
    ) -> Iterator[WorkflowRunOutputEvent]:
        """Execute a specific pipeline by name with event streaming"""
        from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction

        workflow_run_response.status = RunStatus.running

        workflow_started_event = WorkflowStartedEvent(
            run_id=workflow_run_response.run_id or "",
            workflow_name=workflow_run_response.workflow_name,
            workflow_id=workflow_run_response.workflow_id,
            session_id=workflow_run_response.session_id,
        )
        yield self._handle_event(workflow_started_event, workflow_run_response)

        if callable(self.steps):
            if iscoroutinefunction(self.steps) or isasyncgenfunction(self.steps):
                raise ValueError("Cannot use async function with synchronous execution")
            elif isgeneratorfunction(self.steps):
                content = ""
                for chunk in self._call_custom_function(self.steps, execution_input, **kwargs):  # type: ignore[arg-type]
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    # Update the run_response with the content from the result
                    if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
                        content += chunk.content
                        yield chunk
                    else:
                        content += str(chunk)
                workflow_run_response.content = content
            else:
                raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs)
            workflow_run_response.status = RunStatus.completed

        else:
            try:
                # Track outputs from each step for enhanced data flow
                collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
                previous_step_outputs: Dict[str, StepOutput] = {}

                shared_images: List[Image] = execution_input.images or []
                output_images: List[Image] = (execution_input.images or []).copy()  # Start with input images
                shared_videos: List[Video] = execution_input.videos or []
                output_videos: List[Video] = (execution_input.videos or []).copy()  # Start with input videos
                shared_audio: List[Audio] = execution_input.audio or []
                output_audio: List[Audio] = (execution_input.audio or []).copy()  # Start with input audio
                shared_files: List[File] = execution_input.files or []
                output_files: List[File] = (execution_input.files or []).copy()  # Start with input files

                early_termination = False

                # Track partial step data in case of cancellation
                current_step_name = ""
                current_step = None
                partial_step_content = ""

                for i, step in enumerate(self.steps):  # type: ignore[arg-type]
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    step_name = getattr(step, "name", f"step_{i + 1}")
                    log_debug(f"Streaming step {i + 1}/{self._get_step_count()}: {step_name}")

                    # Track current step for cancellation handler
                    current_step_name = step_name
                    current_step = step
                    # Reset partial data for this step
                    partial_step_content = ""

                    # Create enhanced StepInput
                    step_input = self._create_step_input(
                        execution_input=execution_input,
                        previous_step_outputs=previous_step_outputs,
                        shared_images=shared_images,
                        shared_videos=shared_videos,
                        shared_audio=shared_audio,
                        shared_files=shared_files,
                    )

                    # Execute step with streaming and yield all events
                    for event in step.execute_stream(  # type: ignore[union-attr]
                        step_input,
                        session_id=session.session_id,
                        user_id=self.user_id,
                        stream_events=stream_events,
                        stream_executor_events=self.stream_executor_events,
                        workflow_run_response=workflow_run_response,
                        run_context=run_context,
                        step_index=i,
                        store_executor_outputs=self.store_executor_outputs,
                        workflow_session=session,
                        add_workflow_history_to_steps=self.add_workflow_history_to_steps
                        if self.add_workflow_history_to_steps
                        else None,
                        num_history_runs=self.num_history_runs,
                        background_tasks=background_tasks,
                    ):
                        raise_if_cancelled(workflow_run_response.run_id)  # type: ignore

                        # Accumulate partial data from streaming events
                        partial_step_content = self._accumulate_partial_step_data(event, partial_step_content)  # type: ignore

                        # Handle events
                        if isinstance(event, StepOutput):
                            step_output = event
                            collected_step_outputs.append(step_output)

                            # Update the workflow-level previous_step_outputs dictionary
                            previous_step_outputs[step_name] = step_output

                            # Transform StepOutput to StepOutputEvent for consistent streaming interface
                            step_output_event = self._transform_step_output_to_event(
                                step_output, workflow_run_response, step_index=i
                            )

                            if step_output.stop:
                                logger.info(f"Early termination requested by step {step_name}")
                                # Update shared media for next step
                                shared_images.extend(step_output.images or [])
                                shared_videos.extend(step_output.videos or [])
                                shared_audio.extend(step_output.audio or [])
                                shared_files.extend(step_output.files or [])
                                output_images.extend(step_output.images or [])
                                output_videos.extend(step_output.videos or [])
                                output_audio.extend(step_output.audio or [])
                                output_files.extend(step_output.files or [])

                                # Only yield StepOutputEvent for function executors, not for agents/teams
                                if getattr(step, "executor_type", None) == "function":
                                    yield step_output_event

                                # Break out of the step loop
                                early_termination = True
                                break

                            # Update shared media for next step
                            shared_images.extend(step_output.images or [])
                            shared_videos.extend(step_output.videos or [])
                            shared_audio.extend(step_output.audio or [])
                            shared_files.extend(step_output.files or [])
                            output_images.extend(step_output.images or [])
                            output_videos.extend(step_output.videos or [])
                            output_audio.extend(step_output.audio or [])
                            output_files.extend(step_output.files or [])

                            # Only yield StepOutputEvent for generator functions, not for agents/teams
                            if getattr(step, "executor_type", None) == "function":
                                yield step_output_event

                        elif isinstance(event, WorkflowRunOutputEvent):  # type: ignore
                            # Enrich event with workflow context before yielding
                            enriched_event = self._enrich_event_with_workflow_context(
                                event, workflow_run_response, step_index=i, step=step
                            )
                            yield self._handle_event(enriched_event, workflow_run_response)  # type: ignore

                        else:
                            # Enrich other events with workflow context before yielding
                            enriched_event = self._enrich_event_with_workflow_context(
                                event, workflow_run_response, step_index=i, step=step
                            )
                            if self.stream_executor_events:
                                yield self._handle_event(enriched_event, workflow_run_response)  # type: ignore

                    # Break out of main step loop if early termination was requested
                    if "early_termination" in locals() and early_termination:
                        break

                # Update the workflow_run_response with completion data
                if collected_step_outputs:
                    # Stop the timer for the Run duration
                    if workflow_run_response.metrics:
                        workflow_run_response.metrics.stop_timer()

                    workflow_run_response.metrics = self._aggregate_workflow_metrics(
                        collected_step_outputs,
                        workflow_run_response.metrics,  # type: ignore[arg-type]
                    )
                    last_output = cast(StepOutput, collected_step_outputs[-1])

                    # Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
                    if getattr(last_output, "steps", None):
                        _cur = last_output
                        while getattr(_cur, "steps", None):
                            _steps = _cur.steps or []
                            if not _steps:
                                break
                            _cur = _steps[-1]
                        workflow_run_response.content = _cur.content
                    else:
                        workflow_run_response.content = last_output.content
                else:
                    workflow_run_response.content = "No steps executed"

                workflow_run_response.step_results = collected_step_outputs
                workflow_run_response.images = output_images
                workflow_run_response.videos = output_videos
                workflow_run_response.audio = output_audio
                workflow_run_response.status = RunStatus.completed

            except (InputCheckError, OutputCheckError) as e:
                log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")

                from agno.run.workflow import WorkflowErrorEvent

                error_event = WorkflowErrorEvent(
                    run_id=workflow_run_response.run_id or "",
                    workflow_id=self.id,
                    workflow_name=self.name,
                    session_id=session.session_id,
                    error=str(e),
                )

                yield error_event

                # Update workflow_run_response with error
                workflow_run_response.content = error_event.error
                workflow_run_response.status = RunStatus.error
            except RunCancelledException as e:
                # Handle run cancellation during streaming
                logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled during streaming")
                workflow_run_response.status = RunStatus.cancelled
                workflow_run_response.content = str(e)

                # Capture partial progress from the step that was cancelled mid-stream
                if partial_step_content:
                    logger.info(
                        f"Step with name  '{current_step_name}' was cancelled. Setting its partial progress as step output."
                    )
                    partial_step_output = StepOutput(
                        step_name=current_step_name,
                        step_id=getattr(current_step, "step_id", None) if current_step else None,
                        step_type=StepType.STEP,
                        executor_type=getattr(current_step, "executor_type", None) if current_step else None,
                        executor_name=getattr(current_step, "executor_name", None) if current_step else None,
                        content=partial_step_content,
                        success=False,
                        error="Cancelled during execution",
                    )
                    collected_step_outputs.append(partial_step_output)

                # Preserve all progress (completed steps + partial step) before cancellation
                if collected_step_outputs:
                    workflow_run_response.step_results = collected_step_outputs
                    # Stop the timer for the Run duration
                    if workflow_run_response.metrics:
                        workflow_run_response.metrics.stop_timer()

                    workflow_run_response.metrics = self._aggregate_workflow_metrics(
                        collected_step_outputs,
                        workflow_run_response.metrics,  # type: ignore[arg-type]
                    )

                cancelled_event = WorkflowCancelledEvent(
                    run_id=workflow_run_response.run_id or "",
                    workflow_id=self.id,
                    workflow_name=self.name,
                    session_id=session.session_id,
                    reason=str(e),
                )
                yield self._handle_event(cancelled_event, workflow_run_response)
            except Exception as e:
                logger.error(f"Workflow execution failed: {e}")

                from agno.run.workflow import WorkflowErrorEvent

                error_event = WorkflowErrorEvent(
                    run_id=workflow_run_response.run_id or "",
                    workflow_id=self.id,
                    workflow_name=self.name,
                    session_id=session.session_id,
                    error=str(e),
                )

                yield error_event

                # Update workflow_run_response with error
                workflow_run_response.content = error_event.error
                workflow_run_response.status = RunStatus.error
                raise e

        # Yield workflow completed event
        workflow_completed_event = WorkflowCompletedEvent(
            run_id=workflow_run_response.run_id or "",
            content=workflow_run_response.content,
            workflow_name=workflow_run_response.workflow_name,
            workflow_id=workflow_run_response.workflow_id,
            session_id=workflow_run_response.session_id,
            step_results=workflow_run_response.step_results,  # type: ignore
            metadata=workflow_run_response.metadata,
        )
        yield self._handle_event(workflow_completed_event, workflow_run_response)

        # Stop timer on error
        if workflow_run_response.metrics:
            workflow_run_response.metrics.stop_timer()

        # Store the completed workflow response
        self._update_session_metrics(session=session, workflow_run_response=workflow_run_response)
        session.upsert_run(run=workflow_run_response)
        self.save_session(session=session)

        # Always clean up the run tracking
        cleanup_run(workflow_run_response.run_id)  # type: ignore

        # Log Workflow Telemetry
        if self.telemetry:
            self._log_workflow_telemetry(session_id=session.session_id, run_id=workflow_run_response.run_id)

    async def _acall_custom_function(
        self, func: Callable, execution_input: WorkflowExecutionInput, **kwargs: Any
    ) -> Any:
        """Call custom function with only the parameters it expects - handles both async functions and async generators"""
        from inspect import isasyncgenfunction, signature

        sig = signature(func)

        # Build arguments based on what the function actually accepts
        call_kwargs: Dict[str, Any] = {}

        # Only add workflow and execution_input if the function expects them
        if "workflow" in sig.parameters:  # type: ignore
            call_kwargs["workflow"] = self
        if "execution_input" in sig.parameters:
            call_kwargs["execution_input"] = execution_input  # type: ignore
        if "session_state" in sig.parameters:
            call_kwargs["session_state"] = self.session_state  # type: ignore

        # Add any other kwargs that the function expects
        for param_name in kwargs:
            if param_name in sig.parameters:  # type: ignore
                call_kwargs[param_name] = kwargs[param_name]

        # If function has **kwargs parameter, pass all remaining kwargs
        for param in sig.parameters.values():  # type: ignore
            if param.kind == param.VAR_KEYWORD:
                call_kwargs.update(kwargs)
                break

        try:
            # Check if it's an async generator function
            if isasyncgenfunction(func):
                # For async generators, call the function and return the async generator directly
                return func(**call_kwargs)  # type: ignore
            else:
                # For regular async functions, await the result
                return await func(**call_kwargs)  # type: ignore
        except TypeError as e:
            # If signature inspection fails, fall back to original method
            logger.warning(
                f"Async function signature inspection failed: {e}. Falling back to original calling convention."
            )
            if isasyncgenfunction(func):
                # For async generators, use the same signature inspection logic in fallback
                return func(**call_kwargs)  # type: ignore
            else:
                # For regular async functions, use the same signature inspection logic in fallback
                return await func(**call_kwargs)  # type: ignore

    async def _aload_or_create_session(
        self, session_id: str, user_id: Optional[str], session_state: Optional[Dict[str, Any]]
    ) -> Tuple[WorkflowSession, Dict[str, Any]]:
        """Load or create session from database, update metadata, and prepare session state.

        Returns:
            Tuple of (workflow_session, prepared_session_state)
        """
        # Read existing session from database
        if self._has_async_db():
            workflow_session = await self.aread_or_create_session(session_id=session_id, user_id=user_id)
        else:
            workflow_session = self.read_or_create_session(session_id=session_id, user_id=user_id)
        self._update_metadata(session=workflow_session)

        # Update session state from DB
        _session_state = session_state if session_state is not None else {}
        _session_state = self._load_session_state(session=workflow_session, session_state=_session_state)

        return workflow_session, _session_state

    async def _aexecute(
        self,
        session_id: str,
        user_id: Optional[str],
        execution_input: WorkflowExecutionInput,
        workflow_run_response: WorkflowRunOutput,
        run_context: RunContext,
        background_tasks: Optional[Any] = None,
        **kwargs: Any,
    ) -> WorkflowRunOutput:
        """Execute a specific pipeline by name asynchronously"""
        from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction

        # Read existing session from database
        workflow_session, run_context.session_state = await self._aload_or_create_session(
            session_id=session_id, user_id=user_id, session_state=run_context.session_state
        )

        workflow_run_response.status = RunStatus.running

        if callable(self.steps):
            # Execute the workflow with the custom executor
            content = ""

            if iscoroutinefunction(self.steps):  # type: ignore
                workflow_run_response.content = await self._acall_custom_function(self.steps, execution_input, **kwargs)
            elif isgeneratorfunction(self.steps):
                for chunk in self.steps(self, execution_input, **kwargs):  # type: ignore[arg-type]
                    if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
                        content += chunk.content
                    else:
                        content += str(chunk)
                workflow_run_response.content = content
            elif isasyncgenfunction(self.steps):  # type: ignore
                async_gen = await self._acall_custom_function(self.steps, execution_input, **kwargs)
                async for chunk in async_gen:
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
                        content += chunk.content
                    else:
                        content += str(chunk)
                workflow_run_response.content = content
            else:
                raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs)
            workflow_run_response.status = RunStatus.completed

        else:
            try:
                # Track outputs from each step for enhanced data flow
                collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
                previous_step_outputs: Dict[str, StepOutput] = {}

                shared_images: List[Image] = execution_input.images or []
                output_images: List[Image] = (execution_input.images or []).copy()  # Start with input images
                shared_videos: List[Video] = execution_input.videos or []
                output_videos: List[Video] = (execution_input.videos or []).copy()  # Start with input videos
                shared_audio: List[Audio] = execution_input.audio or []
                output_audio: List[Audio] = (execution_input.audio or []).copy()  # Start with input audio
                shared_files: List[File] = execution_input.files or []
                output_files: List[File] = (execution_input.files or []).copy()  # Start with input files

                for i, step in enumerate(self.steps):  # type: ignore[arg-type]
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    step_name = getattr(step, "name", f"step_{i + 1}")
                    log_debug(f"Async Executing step {i + 1}/{self._get_step_count()}: {step_name}")

                    # Create enhanced StepInput
                    step_input = self._create_step_input(
                        execution_input=execution_input,
                        previous_step_outputs=previous_step_outputs,
                        shared_images=shared_images,
                        shared_videos=shared_videos,
                        shared_audio=shared_audio,
                        shared_files=shared_files,
                    )

                    # Check for cancellation before executing step
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore

                    step_output = await step.aexecute(  # type: ignore[union-attr]
                        step_input,
                        session_id=session_id,
                        user_id=self.user_id,
                        workflow_run_response=workflow_run_response,
                        run_context=run_context,
                        store_executor_outputs=self.store_executor_outputs,
                        workflow_session=workflow_session,
                        add_workflow_history_to_steps=self.add_workflow_history_to_steps
                        if self.add_workflow_history_to_steps
                        else None,
                        num_history_runs=self.num_history_runs,
                        background_tasks=background_tasks,
                    )

                    # Check for cancellation after step execution
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore

                    # Update the workflow-level previous_step_outputs dictionary
                    previous_step_outputs[step_name] = step_output
                    collected_step_outputs.append(step_output)

                    # Update shared media for next step
                    shared_images.extend(step_output.images or [])
                    shared_videos.extend(step_output.videos or [])
                    shared_audio.extend(step_output.audio or [])
                    shared_files.extend(step_output.files or [])
                    output_images.extend(step_output.images or [])
                    output_videos.extend(step_output.videos or [])
                    output_audio.extend(step_output.audio or [])
                    output_files.extend(step_output.files or [])

                    if step_output.stop:
                        logger.info(f"Early termination requested by step {step_name}")
                        break

                # Update the workflow_run_response with completion data
                if collected_step_outputs:
                    # Stop the timer for the Run duration
                    if workflow_run_response.metrics:
                        workflow_run_response.metrics.stop_timer()

                    workflow_run_response.metrics = self._aggregate_workflow_metrics(
                        collected_step_outputs,
                        workflow_run_response.metrics,  # type: ignore[arg-type]
                    )
                    last_output = cast(StepOutput, collected_step_outputs[-1])

                    # Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
                    if getattr(last_output, "steps", None):
                        _cur = last_output
                        while getattr(_cur, "steps", None):
                            _steps = _cur.steps or []
                            if not _steps:
                                break
                            _cur = _steps[-1]
                        workflow_run_response.content = _cur.content
                    else:
                        workflow_run_response.content = last_output.content
                else:
                    workflow_run_response.content = "No steps executed"

                workflow_run_response.step_results = collected_step_outputs
                workflow_run_response.images = output_images
                workflow_run_response.videos = output_videos
                workflow_run_response.audio = output_audio
                workflow_run_response.status = RunStatus.completed

            except (InputCheckError, OutputCheckError) as e:
                log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")
                # Store error response
                workflow_run_response.status = RunStatus.error
                workflow_run_response.content = f"Validation failed: {str(e)} | Check: {e.check_trigger}"

                raise e
            except RunCancelledException as e:
                logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled")
                workflow_run_response.status = RunStatus.cancelled
                workflow_run_response.content = str(e)
            except Exception as e:
                logger.error(f"Workflow execution failed: {e}")
                workflow_run_response.status = RunStatus.error
                workflow_run_response.content = f"Workflow execution failed: {e}"
                raise e

        # Stop timer on error
        if workflow_run_response.metrics:
            workflow_run_response.metrics.stop_timer()

        self._update_session_metrics(session=workflow_session, workflow_run_response=workflow_run_response)
        workflow_session.upsert_run(run=workflow_run_response)
        if self._has_async_db():
            await self.asave_session(session=workflow_session)
        else:
            self.save_session(session=workflow_session)
        # Always clean up the run tracking
        cleanup_run(workflow_run_response.run_id)  # type: ignore

        # Log Workflow Telemetry
        if self.telemetry:
            await self._alog_workflow_telemetry(session_id=session_id, run_id=workflow_run_response.run_id)

        return workflow_run_response

    async def _aexecute_stream(
        self,
        session_id: str,
        user_id: Optional[str],
        execution_input: WorkflowExecutionInput,
        workflow_run_response: WorkflowRunOutput,
        run_context: RunContext,
        stream_events: bool = False,
        websocket_handler: Optional[WebSocketHandler] = None,
        background_tasks: Optional[Any] = None,
        **kwargs: Any,
    ) -> AsyncIterator[WorkflowRunOutputEvent]:
        """Execute a specific pipeline by name with event streaming"""
        from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction

        # Read existing session from database
        workflow_session, run_context.session_state = await self._aload_or_create_session(
            session_id=session_id, user_id=user_id, session_state=run_context.session_state
        )

        workflow_run_response.status = RunStatus.running

        workflow_started_event = WorkflowStartedEvent(
            run_id=workflow_run_response.run_id or "",
            workflow_name=workflow_run_response.workflow_name,
            workflow_id=workflow_run_response.workflow_id,
            session_id=workflow_run_response.session_id,
        )
        yield self._handle_event(workflow_started_event, workflow_run_response, websocket_handler=websocket_handler)

        if callable(self.steps):
            if iscoroutinefunction(self.steps):  # type: ignore
                workflow_run_response.content = await self._acall_custom_function(self.steps, execution_input, **kwargs)
            elif isgeneratorfunction(self.steps):
                content = ""
                for chunk in self.steps(self, execution_input, **kwargs):  # type: ignore[arg-type]
                    if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
                        content += chunk.content
                        yield chunk
                    else:
                        content += str(chunk)
                workflow_run_response.content = content
            elif isasyncgenfunction(self.steps):  # type: ignore
                content = ""
                async_gen = await self._acall_custom_function(self.steps, execution_input, **kwargs)
                async for chunk in async_gen:
                    raise_if_cancelled(workflow_run_response.run_id)  # type: ignore
                    if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
                        content += chunk.content
                        yield chunk
                    else:
                        content += str(chunk)
                workflow_run_response.content = content
            else:
                workflow_run_response.content = self.steps(self, execution_input, **kwargs)
            workflow_run_response.status = RunStatus.completed

        else:
            try:
                # Track outputs from each step for enhanced data flow
                collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
                previous_step_outputs: Dict[str, StepOutput] = {}

                shared_images: List[Image] = execution_input.images or []
                output_images: List[Image] = (execution_input.images or []).copy()  # Start with input images
                shared_videos: List[Video] = execution_input.videos or []
                output_videos: List[Video] = (execution_input.videos or []).copy()  # Start with input videos
                shared_audio: List[Audio] = execution_input.audio or []
                output_audio: List[Audio] = (execution_input.audio or []).copy()  # Start with input audio
                shared_files: List[File] = execution_input.files or []
                output_files: List[File] = (execution_input.files or []).copy()  # Start with input files

                early_termination = False

                # Track partial step data in case of cancellation
                current_step_name = ""
                current_step = None
                partial_step_content = ""

                for i, step in enumerate(self.steps):  # type: ignore[arg-type]
                    if workflow_run_response.run_id:
                        raise_if_cancelled(workflow_run_response.run_id)
                    step_name = getattr(step, "name", f"step_{i + 1}")
                    log_debug(f"Async streaming step {i + 1}/{self._get_step_count()}: {step_name}")

                    current_step_name = step_name
                    current_step = step
                    # Reset partial data for this step
                    partial_step_content = ""

                    # Create enhanced StepInput
                    step_input = self._create_step_input(
                        execution_input=execution_input,
                        previous_step_outputs=previous_step_outputs,
                        shared_images=shared_images,
                        shared_videos=shared_videos,
                        shared_audio=shared_audio,
                        shared_files=shared_files,
                    )

                    # Execute step with streaming and yield all events
                    async for event in step.aexecute_stream(  # type: ignore[union-attr]
                        step_input,
                        session_id=session_id,
                        user_id=self.user_id,
                        stream_events=stream_events,
                        stream_executor_events=self.stream_executor_events,
                        workflow_run_response=workflow_run_response,
                        run_context=run_context,
                        step_index=i,
                        store_executor_outputs=self.store_executor_outputs,
                        workflow_session=workflow_session,
                        add_workflow_history_to_steps=self.add_workflow_history_to_steps
                        if self.add_workflow_history_to_steps
                        else None,
                        num_history_runs=self.num_history_runs,
                        background_tasks=background_tasks,
                    ):
                        if workflow_run_response.run_id:
                            raise_if_cancelled(workflow_run_response.run_id)

                        # Accumulate partial data from streaming events
                        partial_step_content = self._accumulate_partial_step_data(event, partial_step_content)  # type: ignore

                        if isinstance(event, StepOutput):
                            step_output = event
                            collected_step_outputs.append(step_output)

                            # Update the workflow-level previous_step_outputs dictionary
                            previous_step_outputs[step_name] = step_output

                            # Transform StepOutput to StepOutputEvent for consistent streaming interface
                            step_output_event = self._transform_step_output_to_event(
                                step_output, workflow_run_response, step_index=i
                            )

                            if step_output.stop:
                                logger.info(f"Early termination requested by step {step_name}")
                                # Update shared media for next step
                                shared_images.extend(step_output.images or [])
                                shared_videos.extend(step_output.videos or [])
                                shared_audio.extend(step_output.audio or [])
                                shared_files.extend(step_output.files or [])
                                output_images.extend(step_output.images or [])
                                output_videos.extend(step_output.videos or [])
                                output_audio.extend(step_output.audio or [])
                                output_files.extend(step_output.files or [])

                                if getattr(step, "executor_type", None) == "function":
                                    yield step_output_event

                                # Break out of the step loop
                                early_termination = True
                                break

                            # Update shared media for next step
                            shared_images.extend(step_output.images or [])
                            shared_videos.extend(step_output.videos or [])
                            shared_audio.extend(step_output.audio or [])
                            shared_files.extend(step_output.files or [])
                            output_images.extend(step_output.images or [])
                            output_videos.extend(step_output.videos or [])
                            output_audio.extend(step_output.audio or [])
                            output_files.extend(step_output.files or [])

                            # Only yield StepOutputEvent for generator functions, not for agents/teams
                            if getattr(step, "executor_type", None) == "function":
                                yield step_output_event

                        elif isinstance(event, WorkflowRunOutputEvent):  # type: ignore
                            # Enrich event with workflow context before yielding
                            enriched_event = self._enrich_event_with_workflow_context(
                                event, workflow_run_response, step_index=i, step=step
                            )
                            yield self._handle_event(
                                enriched_event, workflow_run_response, websocket_handler=websocket_handler
                            )  # type: ignore

                        else:
                            # Enrich other events with workflow context before yielding
                            enriched_event = self._enrich_event_with_workflow_context(
                                event, workflow_run_response, step_index=i, step=step
                            )
                            if self.stream_executor_events:
                                yield self._handle_event(
                                    enriched_event, workflow_run_response, websocket_handler=websocket_handler
                                )  # type: ignore

                    # Break out of main step loop if early termination was requested
                    if "early_termination" in locals() and early_termination:
                        break

                # Update the workflow_run_response with completion data
                if collected_step_outputs:
                    # Stop the timer for the Run duration
                    if workflow_run_response.metrics:
                        workflow_run_response.metrics.stop_timer()

                    workflow_run_response.metrics = self._aggregate_workflow_metrics(
                        collected_step_outputs,
                        workflow_run_response.metrics,  # type: ignore[arg-type]
                    )
                    last_output = cast(StepOutput, collected_step_outputs[-1])

                    # Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
                    if getattr(last_output, "steps", None):
                        _cur = last_output
                        while getattr(_cur, "steps", None):
                            _steps = _cur.steps or []
                            if not _steps:
                                break
                            _cur = _steps[-1]
                        workflow_run_response.content = _cur.content
                    else:
                        workflow_run_response.content = last_output.content
                else:
                    workflow_run_response.content = "No steps executed"

                workflow_run_response.step_results = collected_step_outputs
                workflow_run_response.images = output_images
                workflow_run_response.videos = output_videos
                workflow_run_response.audio = output_audio
                workflow_run_response.status = RunStatus.completed

            except (InputCheckError, OutputCheckError) as e:
                log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")

                from agno.run.workflow import WorkflowErrorEvent

                error_event = WorkflowErrorEvent(
                    run_id=workflow_run_response.run_id or "",
                    workflow_id=self.id,
                    workflow_name=self.name,
                    session_id=session_id,
                    error=str(e),
                )

                yield error_event

                # Update workflow_run_response with error
                workflow_run_response.content = error_event.error
                workflow_run_response.status = RunStatus.error
            except RunCancelledException as e:
                # Handle run cancellation during streaming
                logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled during streaming")
                workflow_run_response.status = RunStatus.cancelled
                workflow_run_response.content = str(e)

                # Capture partial progress from the step that was cancelled mid-stream
                if partial_step_content:
                    logger.info(
                        f"Step with name  '{current_step_name}' was cancelled. Setting its partial progress as step output."
                    )
                    partial_step_output = StepOutput(
                        step_name=current_step_name,
                        step_id=getattr(current_step, "step_id", None) if current_step else None,
                        step_type=StepType.STEP,
                        executor_type=getattr(current_step, "executor_type", None) if current_step else None,
                        executor_name=getattr(current_step, "executor_name", None) if current_step else None,
                        content=partial_step_content,
                        success=False,
                        error="Cancelled during execution",
                    )
                    collected_step_outputs.append(partial_step_output)

                # Preserve all progress (completed steps + partial step) before cancellation
                if collected_step_outputs:
                    workflow_run_response.step_results = collected_step_outputs
                    # Stop the timer for the Run duration
                    if workflow_run_response.metrics:
                        workflow_run_response.metrics.stop_timer()

                    workflow_run_response.metrics = self._aggregate_workflow_metrics(
                        collected_step_outputs,
                        workflow_run_response.metrics,  # type: ignore[arg-type]
                    )

                cancelled_event = WorkflowCancelledEvent(
                    run_id=workflow_run_response.run_id or "",
                    workflow_id=self.id,
                    workflow_name=self.name,
                    session_id=session_id,
                    reason=str(e),
                )
                yield self._handle_event(
                    cancelled_event,
                    workflow_run_response,
                    websocket_handler=websocket_handler,
                )
            except Exception as e:
                logger.error(f"Workflow execution failed: {e}")

                from agno.run.workflow import WorkflowErrorEvent

                error_event = WorkflowErrorEvent(
                    run_id=workflow_run_response.run_id or "",
                    workflow_id=self.id,
                    workflow_name=self.name,
                    session_id=session_id,
                    error=str(e),
                )

                yield error_event

                # Update workflow_run_response with error
                workflow_run_response.content = error_event.error
                workflow_run_response.status = RunStatus.error
                raise e

        # Yield workflow completed event
        workflow_completed_event = WorkflowCompletedEvent(
            run_id=workflow_run_response.run_id or "",
            content=workflow_run_response.content,
            workflow_name=workflow_run_response.workflow_name,
            workflow_id=workflow_run_response.workflow_id,
            session_id=workflow_run_response.session_id,
            step_results=workflow_run_response.step_results,  # type: ignore[arg-type]
            metadata=workflow_run_response.metadata,
        )
        yield self._handle_event(workflow_completed_event, workflow_run_response, websocket_handler=websocket_handler)

        # Stop timer on error
        if workflow_run_response.metrics:
            workflow_run_response.metrics.stop_timer()

        # Store the completed workflow response
        self._update_session_metrics(session=workflow_session, workflow_run_response=workflow_run_response)
        workflow_session.upsert_run(run=workflow_run_response)
        if self._has_async_db():
            await self.asave_session(session=workflow_session)
        else:
            self.save_session(session=workflow_session)

        # Log Workflow Telemetry
        if self.telemetry:
            await self._alog_workflow_telemetry(session_id=session_id, run_id=workflow_run_response.run_id)

        # Always clean up the run tracking
        cleanup_run(workflow_run_response.run_id)  # type: ignore

    async def _arun_background(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        **kwargs: Any,
    ) -> WorkflowRunOutput:
        """Execute workflow in background using asyncio.create_task()"""

        run_id = str(uuid4())

        self.initialize_workflow()

        session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)

        # Read existing session from database
        workflow_session, session_state = await self._aload_or_create_session(
            session_id=session_id, user_id=user_id, session_state=session_state
        )

        run_context = RunContext(
            run_id=run_id,
            session_id=session_id,
            user_id=user_id,
            session_state=session_state,
        )

        self._prepare_steps()

        # Create workflow run response with PENDING status
        workflow_run_response = WorkflowRunOutput(
            run_id=run_id,
            input=input,
            session_id=session_id,
            workflow_id=self.id,
            workflow_name=self.name,
            created_at=int(datetime.now().timestamp()),
            status=RunStatus.pending,
        )

        # Start the run metrics timer
        workflow_run_response.metrics = WorkflowMetrics(steps={})
        workflow_run_response.metrics.start_timer()

        # Store PENDING response immediately
        workflow_session.upsert_run(run=workflow_run_response)
        if self._has_async_db():
            await self.asave_session(session=workflow_session)
        else:
            self.save_session(session=workflow_session)

        # Prepare execution input
        inputs = WorkflowExecutionInput(
            input=input,
            additional_data=additional_data,
            audio=audio,  # type: ignore
            images=images,  # type: ignore
            videos=videos,  # type: ignore
            files=files,  # type: ignore
        )

        self.update_agents_and_teams_session_info()

        async def execute_workflow_background():
            """Simple background execution"""
            try:
                # Update status to RUNNING and save
                workflow_run_response.status = RunStatus.running
                if self._has_async_db():
                    await self.asave_session(session=workflow_session)
                else:
                    self.save_session(session=workflow_session)

                if self.agent is not None:
                    self._aexecute_workflow_agent(
                        user_input=input,  # type: ignore
                        execution_input=inputs,
                        run_context=run_context,
                        stream=False,
                        **kwargs,
                    )
                else:
                    await self._aexecute(
                        session_id=session_id,
                        user_id=user_id,
                        execution_input=inputs,
                        workflow_run_response=workflow_run_response,
                        run_context=run_context,
                        session_state=session_state,
                        **kwargs,
                    )

                log_debug(f"Background execution completed with status: {workflow_run_response.status}")

            except Exception as e:
                logger.error(f"Background workflow execution failed: {e}")
                workflow_run_response.status = RunStatus.error
                workflow_run_response.content = f"Background execution failed: {str(e)}"
                if self._has_async_db():
                    await self.asave_session(session=workflow_session)
                else:
                    self.save_session(session=workflow_session)

        # Create and start asyncio task
        loop = asyncio.get_running_loop()
        loop.create_task(execute_workflow_background())

        # Return SAME object that will be updated by background execution
        return workflow_run_response

    async def _arun_background_stream(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream_events: bool = False,
        websocket_handler: Optional[WebSocketHandler] = None,
        **kwargs: Any,
    ) -> WorkflowRunOutput:
        """Execute workflow in background with streaming and WebSocket broadcasting"""

        run_id = str(uuid4())

        self.initialize_workflow()

        session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)

        # Read existing session from database
        workflow_session, session_state = await self._aload_or_create_session(
            session_id=session_id, user_id=user_id, session_state=session_state
        )

        run_context = RunContext(
            run_id=run_id,
            session_id=session_id,
            user_id=user_id,
            session_state=session_state,
        )

        self._prepare_steps()

        # Create workflow run response with PENDING status
        workflow_run_response = WorkflowRunOutput(
            run_id=run_id,
            input=input,
            session_id=session_id,
            workflow_id=self.id,
            workflow_name=self.name,
            created_at=int(datetime.now().timestamp()),
            status=RunStatus.pending,
        )

        # Start the run metrics timer
        workflow_run_response.metrics = WorkflowMetrics(steps={})
        workflow_run_response.metrics.start_timer()

        # Prepare execution input
        inputs = WorkflowExecutionInput(
            input=input,
            additional_data=additional_data,
            audio=audio,  # type: ignore
            images=images,  # type: ignore
            videos=videos,  # type: ignore
            files=files,  # type: ignore
        )

        self.update_agents_and_teams_session_info()

        async def execute_workflow_background_stream():
            """Background execution with streaming and WebSocket broadcasting"""
            try:
                if self.agent is not None:
                    result = self._aexecute_workflow_agent(
                        user_input=input,  # type: ignore
                        run_context=run_context,
                        execution_input=inputs,
                        stream=True,
                        websocket_handler=websocket_handler,
                        **kwargs,
                    )
                    # For streaming, result is an async iterator
                    async for event in result:  # type: ignore
                        # Events are automatically broadcast by _handle_event in the agent execution
                        # We just consume them here to drive the execution
                        pass
                    log_debug(
                        f"Background streaming execution (workflow agent) completed with status: {workflow_run_response.status}"
                    )
                else:
                    # Update status to RUNNING and save
                    workflow_run_response.status = RunStatus.running

                    workflow_session.upsert_run(run=workflow_run_response)
                    if self._has_async_db():
                        await self.asave_session(session=workflow_session)
                    else:
                        self.save_session(session=workflow_session)

                    # Execute with streaming - consume all events (they're auto-broadcast via _handle_event)
                    async for event in self._aexecute_stream(
                        session_id=session_id,
                        user_id=user_id,
                        execution_input=inputs,
                        workflow_run_response=workflow_run_response,
                        stream_events=stream_events,
                        run_context=run_context,
                        websocket_handler=websocket_handler,
                        **kwargs,
                    ):
                        # Events are automatically broadcast by _handle_event
                        # We just consume them here to drive the execution
                        pass

                log_debug(f"Background streaming execution completed with status: {workflow_run_response.status}")

            except Exception as e:
                logger.error(f"Background streaming workflow execution failed: {e}")
                workflow_run_response.status = RunStatus.error
                workflow_run_response.content = f"Background streaming execution failed: {str(e)}"
                if self._has_async_db():
                    await self.asave_session(session=workflow_session)
                else:
                    self.save_session(session=workflow_session)

        # Create and start asyncio task for background streaming execution
        loop = asyncio.get_running_loop()
        loop.create_task(execute_workflow_background_stream())

        # Return SAME object that will be updated by background execution
        return workflow_run_response

    async def aget_run(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
        """Get the status and details of a background workflow run - SIMPLIFIED"""
        # Use provided session_id or fall back to self.session_id
        _session_id = session_id if session_id is not None else self.session_id

        if self.db is not None and _session_id is not None:
            session = await self.db.aget_session(session_id=_session_id, session_type=SessionType.WORKFLOW)  # type: ignore
            if session and isinstance(session, WorkflowSession) and session.runs:
                # Find the run by ID
                for run in session.runs:
                    if run.run_id == run_id:
                        return run

        return None

    def get_run(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
        """Get the status and details of a background workflow run - SIMPLIFIED"""
        # Use provided session_id or fall back to self.session_id
        _session_id = session_id if session_id is not None else self.session_id

        if self.db is not None and _session_id is not None:
            session = self.db.get_session(session_id=_session_id, session_type=SessionType.WORKFLOW)
            if session and isinstance(session, WorkflowSession) and session.runs:
                # Find the run by ID
                for run in session.runs:
                    if run.run_id == run_id:
                        return run

        return None

    def _initialize_workflow_agent(
        self,
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        stream: bool = False,
    ) -> None:
        """Initialize the workflow agent with tools (but NOT context - that's passed per-run)"""
        from agno.tools.function import Function

        workflow_tool_func = self.agent.create_workflow_tool(  # type: ignore
            workflow=self,
            session=session,
            execution_input=execution_input,
            run_context=run_context,
            stream=stream,
        )
        workflow_tool = Function.from_callable(workflow_tool_func)

        self.agent.tools = [workflow_tool]  # type: ignore
        self.agent._rebuild_tools = True  # type: ignore

        log_debug("Workflow agent initialized with run_workflow tool")

    def _get_workflow_agent_dependencies(self, session: WorkflowSession) -> Dict[str, Any]:
        """Build dependencies dict with workflow context to pass to agent.run()"""
        # Get configuration from the WorkflowAgent instance
        add_history = True
        num_runs = 5

        if self.agent and isinstance(self.agent, WorkflowAgent):
            add_history = self.agent.add_workflow_history
            num_runs = self.agent.num_history_runs or 5

        if add_history:
            history_context = (
                session.get_workflow_history_context(num_runs=num_runs) or "No previous workflow runs in this session."
            )
        else:
            history_context = "No workflow history available."

        # Build workflow context with description and history
        workflow_context = ""
        if self.description:
            workflow_context += f"Workflow Description: {self.description}\n\n"

        workflow_context += history_context

        return {
            "workflow_context": workflow_context,
        }

    def _execute_workflow_agent(
        self,
        user_input: Union[str, Dict[str, Any], List[Any], BaseModel],
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        stream: bool = False,
        **kwargs: Any,
    ) -> Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]:
        """
        Execute the workflow agent in streaming or non-streaming mode.

        The agent decides whether to run the workflow or answer directly from history.

        Args:
            user_input: The user's input
            session: The workflow session
            execution_input: The execution input
            run_context: The run context
            stream: Whether to stream the response
            stream_intermediate_steps: Whether to stream intermediate steps

        Returns:
            WorkflowRunOutput if stream=False, Iterator[WorkflowRunOutputEvent] if stream=True
        """
        if stream:
            return self._run_workflow_agent_stream(
                agent_input=user_input,
                session=session,
                execution_input=execution_input,
                run_context=run_context,
                stream=stream,
                **kwargs,
            )
        else:
            return self._run_workflow_agent(
                agent_input=user_input,
                session=session,
                execution_input=execution_input,
                run_context=run_context,
                stream=stream,
            )

    def _run_workflow_agent_stream(
        self,
        agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        stream: bool = False,
        **kwargs: Any,
    ) -> Iterator[WorkflowRunOutputEvent]:
        """
        Execute the workflow agent in streaming mode.

        The agent's tool (run_workflow) is a generator that yields workflow events directly.
        These events bubble up through the agent's streaming and are yielded here.
        We filter to only yield WorkflowRunOutputEvent to the CLI.

        Yields:
            WorkflowRunOutputEvent: Events from workflow execution (agent events are filtered)
        """
        from typing import get_args

        from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent

        # Initialize agent with stream_intermediate_steps=True so tool yields events
        self._initialize_workflow_agent(session, execution_input, run_context=run_context, stream=stream)

        # Build dependencies with workflow context
        run_context.dependencies = self._get_workflow_agent_dependencies(session)

        # Run agent with streaming - workflow events will bubble up from the tool
        agent_response: Optional[RunOutput] = None
        workflow_executed = False

        from agno.run.agent import RunContentEvent
        from agno.run.team import RunContentEvent as TeamRunContentEvent
        from agno.run.workflow import WorkflowAgentCompletedEvent, WorkflowAgentStartedEvent

        log_debug(f"Executing workflow agent with streaming - input: {agent_input}...")

        # Create a workflow run response upfront for potential direct answer (will be used only if workflow is not executed)
        run_id = str(uuid4())
        direct_reply_run_response = WorkflowRunOutput(
            run_id=run_id,
            input=execution_input.input,
            session_id=session.session_id,
            workflow_id=self.id,
            workflow_name=self.name,
            created_at=int(datetime.now().timestamp()),
        )

        # Yield WorkflowAgentStartedEvent at the beginning (stored in direct_reply_run_response)
        agent_started_event = WorkflowAgentStartedEvent(
            workflow_name=self.name,
            workflow_id=self.id,
            session_id=session.session_id,
        )
        yield agent_started_event

        # Run the agent in streaming mode and yield all events
        for event in self.agent.run(  # type: ignore[union-attr]
            input=agent_input,
            stream=True,
            stream_intermediate_steps=True,
            yield_run_response=True,
            session_id=session.session_id,
            dependencies=run_context.dependencies,  # Pass context dynamically per-run
            session_state=run_context.session_state,  # Pass session state dynamically per-run
        ):  # type: ignore
            if isinstance(event, tuple(get_args(WorkflowRunOutputEvent))):
                yield event  # type: ignore[misc]

                # Track if workflow was executed by checking for WorkflowCompletedEvent
                if isinstance(event, WorkflowCompletedEvent):
                    workflow_executed = True
            elif isinstance(event, (RunContentEvent, TeamRunContentEvent)):
                if event.step_name is None:
                    # This is from the workflow agent itself
                    # Enrich with metadata to mark it as a workflow agent event

                    if workflow_executed:
                        continue  # Skip if workflow was already executed

                    # workflow_agent field is used by consumers of the events to distinguish between workflow agent and regular agent
                    event.workflow_agent = True  # type: ignore
                yield event  # type: ignore[misc]

            # Capture the final RunOutput (but don't yield it)
            if isinstance(event, RunOutput):
                agent_response = event

        # Handle direct answer case (no workflow execution)
        if not workflow_executed:
            # Update the pre-created workflow run response with the direct answer
            direct_reply_run_response.content = agent_response.content if agent_response else ""
            direct_reply_run_response.status = RunStatus.completed
            direct_reply_run_response.workflow_agent_run = agent_response

            workflow_run_response = direct_reply_run_response

            # Store the full agent RunOutput and establish parent-child relationship
            if agent_response:
                agent_response.parent_run_id = workflow_run_response.run_id
                agent_response.workflow_id = workflow_run_response.workflow_id

            log_debug(f"Agent decision: workflow_executed={workflow_executed}")

            # Yield WorkflowAgentCompletedEvent (user internally by print_response_stream)
            agent_completed_event = WorkflowAgentCompletedEvent(
                run_id=agent_response.run_id if agent_response else None,
                workflow_name=self.name,
                workflow_id=self.id,
                session_id=session.session_id,
                content=workflow_run_response.content,
            )
            yield agent_completed_event

            # Yield a workflow completed event with the agent's direct response
            completed_event = WorkflowCompletedEvent(
                run_id=workflow_run_response.run_id or "",
                content=workflow_run_response.content,
                workflow_name=workflow_run_response.workflow_name,
                workflow_id=workflow_run_response.workflow_id,
                session_id=workflow_run_response.session_id,
                step_results=[],
                metadata={"agent_direct_response": True},
            )
            yield completed_event

            # Update the run in session
            session.upsert_run(run=workflow_run_response)
            # Save session
            self.save_session(session=session)

        else:
            # Workflow was executed by the tool
            reloaded_session = self.get_session(session_id=session.session_id)

            if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
                # Get the last run (which is the one just created by the tool)
                last_run = reloaded_session.runs[-1]

                # Yield WorkflowAgentCompletedEvent
                agent_completed_event = WorkflowAgentCompletedEvent(
                    run_id=agent_response.run_id if agent_response else None,
                    workflow_name=self.name,
                    workflow_id=self.id,
                    session_id=session.session_id,
                    content=agent_response.content if agent_response else None,
                )
                yield agent_completed_event

                # Update the last run with workflow_agent_run
                last_run.workflow_agent_run = agent_response

                # Store the full agent RunOutput and establish parent-child relationship
                if agent_response:
                    agent_response.parent_run_id = last_run.run_id
                    agent_response.workflow_id = last_run.workflow_id

                # Save the reloaded session (which has the updated run)
                self.save_session(session=reloaded_session)

            else:
                log_warning("Could not reload session or no runs found after workflow execution")

    def _run_workflow_agent(
        self,
        agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        stream: bool = False,
    ) -> WorkflowRunOutput:
        """
        Execute the workflow agent in non-streaming mode.

        The agent decides whether to run the workflow or answer directly from history.

        Returns:
            WorkflowRunOutput: The workflow run output with agent response
        """

        # Initialize the agent
        self._initialize_workflow_agent(session, execution_input, run_context=run_context, stream=stream)

        # Build dependencies with workflow context
        run_context.dependencies = self._get_workflow_agent_dependencies(session)

        # Run the agent
        agent_response: RunOutput = self.agent.run(  # type: ignore[union-attr]
            input=agent_input,
            session_id=session.session_id,
            dependencies=run_context.dependencies,
            session_state=run_context.session_state,
            stream=stream,
        )  # type: ignore

        # Check if the agent called the workflow tool
        workflow_executed = False
        if agent_response.messages:
            for message in agent_response.messages:
                if message.role == "assistant" and message.tool_calls:
                    # Check if the tool call is specifically for run_workflow
                    for tool_call in message.tool_calls:
                        # Handle both dict and object formats
                        if isinstance(tool_call, dict):
                            tool_name = tool_call.get("function", {}).get("name", "")
                        else:
                            tool_name = tool_call.function.name if hasattr(tool_call, "function") else ""

                        if tool_name == "run_workflow":
                            workflow_executed = True
                            break
                    if workflow_executed:
                        break

        log_debug(f"Workflow agent execution complete. Workflow executed: {workflow_executed}")

        # Handle direct answer case (no workflow execution)
        if not workflow_executed:
            # Create a new workflow run output for the direct answer
            run_id = str(uuid4())
            workflow_run_response = WorkflowRunOutput(
                run_id=run_id,
                input=execution_input.input,
                session_id=session.session_id,
                workflow_id=self.id,
                workflow_name=self.name,
                created_at=int(datetime.now().timestamp()),
                content=agent_response.content,
                status=RunStatus.completed,
                workflow_agent_run=agent_response,
            )

            # Store the full agent RunOutput and establish parent-child relationship
            if agent_response:
                agent_response.parent_run_id = workflow_run_response.run_id
                agent_response.workflow_id = workflow_run_response.workflow_id

            # Update the run in session
            session.upsert_run(run=workflow_run_response)
            self.save_session(session=session)

            log_debug(f"Agent decision: workflow_executed={workflow_executed}")

            return workflow_run_response
        else:
            # Workflow was executed by the tool
            reloaded_session = self.get_session(session_id=session.session_id)

            if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
                # Get the last run (which is the one just created by the tool)
                last_run = reloaded_session.runs[-1]

                # Update the last run directly with workflow_agent_run
                last_run.workflow_agent_run = agent_response

                # Store the full agent RunOutput and establish parent-child relationship
                if agent_response:
                    agent_response.parent_run_id = last_run.run_id
                    agent_response.workflow_id = last_run.workflow_id

                # Save the reloaded session (which has the updated run)
                self.save_session(session=reloaded_session)

                # Return the last run directly (WRO2 from inner workflow)
                return last_run
            else:
                log_warning("Could not reload session or no runs found after workflow execution")
                # Return a placeholder error response
                return WorkflowRunOutput(
                    run_id=str(uuid4()),
                    input=execution_input.input,
                    session_id=session.session_id,
                    workflow_id=self.id,
                    workflow_name=self.name,
                    created_at=int(datetime.now().timestamp()),
                    content="Error: Workflow execution failed",
                    status=RunStatus.error,
                )

    def _async_initialize_workflow_agent(
        self,
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        websocket_handler: Optional[WebSocketHandler] = None,
        stream: bool = False,
    ) -> None:
        """Initialize the workflow agent with async tools (but NOT context - that's passed per-run)"""
        from agno.tools.function import Function

        workflow_tool_func = self.agent.async_create_workflow_tool(  # type: ignore
            workflow=self,
            session=session,
            execution_input=execution_input,
            run_context=run_context,
            stream=stream,
            websocket_handler=websocket_handler,
        )
        workflow_tool = Function.from_callable(workflow_tool_func)

        self.agent.tools = [workflow_tool]  # type: ignore
        self.agent._rebuild_tools = True  # type: ignore

        log_debug("Workflow agent initialized with async run_workflow tool")

    async def _aload_session_for_workflow_agent(
        self,
        session_id: str,
        user_id: Optional[str],
        session_state: Optional[Dict[str, Any]],
    ) -> Tuple[WorkflowSession, Dict[str, Any]]:
        """Helper to load or create session for workflow agent execution"""
        return await self._aload_or_create_session(session_id=session_id, user_id=user_id, session_state=session_state)

    def _aexecute_workflow_agent(
        self,
        user_input: Union[str, Dict[str, Any], List[Any], BaseModel],
        run_context: RunContext,
        execution_input: WorkflowExecutionInput,
        stream: bool = False,
        websocket_handler: Optional[WebSocketHandler] = None,
        **kwargs: Any,
    ):
        """
        Execute the workflow agent asynchronously in streaming or non-streaming mode.

        The agent decides whether to run the workflow or answer directly from history.

        Args:
            user_input: The user's input
            session: The workflow session
            run_context: The run context
            execution_input: The execution input
            stream: Whether to stream the response
            websocket_handler: The WebSocket handler

        Returns:
            Coroutine[WorkflowRunOutput] if stream=False, AsyncIterator[WorkflowRunOutputEvent] if stream=True
        """

        if stream:

            async def _stream():
                session, session_state_loaded = await self._aload_session_for_workflow_agent(
                    run_context.session_id, run_context.user_id, run_context.session_state
                )
                async for event in self._arun_workflow_agent_stream(
                    agent_input=user_input,
                    session=session,
                    execution_input=execution_input,
                    run_context=run_context,
                    stream=stream,
                    websocket_handler=websocket_handler,
                    **kwargs,
                ):
                    yield event

            return _stream()
        else:

            async def _execute():
                session, session_state_loaded = await self._aload_session_for_workflow_agent(
                    run_context.session_id, run_context.user_id, run_context.session_state
                )
                return await self._arun_workflow_agent(
                    agent_input=user_input,
                    session=session,
                    execution_input=execution_input,
                    run_context=run_context,
                    stream=stream,
                )

            return _execute()

    async def _arun_workflow_agent_stream(
        self,
        agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        stream: bool = False,
        websocket_handler: Optional[WebSocketHandler] = None,
        **kwargs: Any,
    ) -> AsyncIterator[WorkflowRunOutputEvent]:
        """
        Execute the workflow agent asynchronously in streaming mode.

        The agent's tool (run_workflow) is an async generator that yields workflow events directly.
        These events bubble up through the agent's streaming and are yielded here.
        We filter to only yield WorkflowRunOutputEvent to the CLI.

        Yields:
            WorkflowRunOutputEvent: Events from workflow execution (agent events are filtered)
        """
        from typing import get_args

        from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent

        logger.info("Workflow agent enabled - async streaming mode")
        log_debug(f"User input: {agent_input}")

        self._async_initialize_workflow_agent(
            session,
            execution_input,
            run_context=run_context,
            stream=stream,
            websocket_handler=websocket_handler,
        )

        run_context.dependencies = self._get_workflow_agent_dependencies(session)

        agent_response: Optional[RunOutput] = None
        workflow_executed = False

        from agno.run.agent import RunContentEvent
        from agno.run.team import RunContentEvent as TeamRunContentEvent
        from agno.run.workflow import WorkflowAgentCompletedEvent, WorkflowAgentStartedEvent

        log_debug(f"Executing async workflow agent with streaming - input: {agent_input}...")

        # Create a workflow run response upfront for potential direct answer (will be used only if workflow is not executed)
        run_id = str(uuid4())
        direct_reply_run_response = WorkflowRunOutput(
            run_id=run_id,
            input=execution_input.input,
            session_id=session.session_id,
            workflow_id=self.id,
            workflow_name=self.name,
            created_at=int(datetime.now().timestamp()),
        )

        # Yield WorkflowAgentStartedEvent at the beginning (stored in direct_reply_run_response)
        agent_started_event = WorkflowAgentStartedEvent(
            workflow_name=self.name,
            workflow_id=self.id,
            session_id=session.session_id,
        )
        self._broadcast_to_websocket(agent_started_event, websocket_handler)
        yield agent_started_event

        # Run the agent in streaming mode and yield all events
        async for event in self.agent.arun(  # type: ignore[union-attr]
            input=agent_input,
            stream=True,
            stream_intermediate_steps=True,
            yield_run_response=True,
            session_id=session.session_id,
            dependencies=run_context.dependencies,  # Pass context dynamically per-run
            session_state=run_context.session_state,  # Pass session state dynamically per-run
        ):  # type: ignore
            if isinstance(event, tuple(get_args(WorkflowRunOutputEvent))):
                yield event  # type: ignore[misc]

                if isinstance(event, WorkflowCompletedEvent):
                    workflow_executed = True
                    log_debug("Workflow execution detected via WorkflowCompletedEvent")

            elif isinstance(event, (RunContentEvent, TeamRunContentEvent)):
                if event.step_name is None:
                    # This is from the workflow agent itself
                    # Enrich with metadata to mark it as a workflow agent event

                    if workflow_executed:
                        continue  # Skip if workflow was already executed

                    # workflow_agent field is used by consumers of the events to distinguish between workflow agent and regular agent
                    event.workflow_agent = True  # type: ignore

                    # Broadcast to WebSocket if available (async context only)
                    self._broadcast_to_websocket(event, websocket_handler)

                yield event  # type: ignore[misc]

            # Capture the final RunOutput (but don't yield it)
            if isinstance(event, RunOutput):
                agent_response = event
                log_debug(
                    f"Agent response: {str(agent_response.content)[:100] if agent_response.content else 'None'}..."
                )

        # Handle direct answer case (no workflow execution)
        if not workflow_executed:
            # Update the pre-created workflow run response with the direct answer
            direct_reply_run_response.content = agent_response.content if agent_response else ""
            direct_reply_run_response.status = RunStatus.completed
            direct_reply_run_response.workflow_agent_run = agent_response

            workflow_run_response = direct_reply_run_response

            # Store the full agent RunOutput and establish parent-child relationship
            if agent_response:
                agent_response.parent_run_id = workflow_run_response.run_id
                agent_response.workflow_id = workflow_run_response.workflow_id

            # Yield WorkflowAgentCompletedEvent
            agent_completed_event = WorkflowAgentCompletedEvent(
                workflow_name=self.name,
                workflow_id=self.id,
                run_id=agent_response.run_id if agent_response else None,
                session_id=session.session_id,
                content=workflow_run_response.content,
            )
            self._broadcast_to_websocket(agent_completed_event, websocket_handler)
            yield agent_completed_event

            # Yield a workflow completed event with the agent's direct response (user internally by aprint_response_stream)
            completed_event = WorkflowCompletedEvent(
                run_id=workflow_run_response.run_id or "",
                content=workflow_run_response.content,
                workflow_name=workflow_run_response.workflow_name,
                workflow_id=workflow_run_response.workflow_id,
                session_id=workflow_run_response.session_id,
                step_results=[],
                metadata={"agent_direct_response": True},
            )
            yield completed_event

            # Update the run in session
            session.upsert_run(run=workflow_run_response)
            # Save session
            if self._has_async_db():
                await self.asave_session(session=session)
            else:
                self.save_session(session=session)

        else:
            # Workflow was executed by the tool
            if self._has_async_db():
                reloaded_session = await self.aget_session(session_id=session.session_id)
            else:
                reloaded_session = self.get_session(session_id=session.session_id)

            if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
                # Get the last run (which is the one just created by the tool)
                last_run = reloaded_session.runs[-1]

                # Yield WorkflowAgentCompletedEvent
                agent_completed_event = WorkflowAgentCompletedEvent(
                    run_id=agent_response.run_id if agent_response else None,
                    workflow_name=self.name,
                    workflow_id=self.id,
                    session_id=session.session_id,
                    content=agent_response.content if agent_response else None,
                )

                self._broadcast_to_websocket(agent_completed_event, websocket_handler)

                yield agent_completed_event

                # Update the last run with workflow_agent_run
                last_run.workflow_agent_run = agent_response

                # Store the full agent RunOutput and establish parent-child relationship
                if agent_response:
                    agent_response.parent_run_id = last_run.run_id
                    agent_response.workflow_id = last_run.workflow_id

                # Save the reloaded session (which has the updated run)
                if self._has_async_db():
                    await self.asave_session(session=reloaded_session)
                else:
                    self.save_session(session=reloaded_session)

            else:
                log_warning("Could not reload session or no runs found after workflow execution")

    async def _arun_workflow_agent(
        self,
        agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
        session: WorkflowSession,
        execution_input: WorkflowExecutionInput,
        run_context: RunContext,
        stream: bool = False,
    ) -> WorkflowRunOutput:
        """
        Execute the workflow agent asynchronously in non-streaming mode.

        The agent decides whether to run the workflow or answer directly from history.

        Returns:
            WorkflowRunOutput: The workflow run output with agent response
        """
        # Initialize the agent
        self._async_initialize_workflow_agent(session, execution_input, run_context=run_context, stream=stream)

        # Build dependencies with workflow context
        run_context.dependencies = self._get_workflow_agent_dependencies(session)

        # Run the agent
        agent_response: RunOutput = await self.agent.arun(  # type: ignore[union-attr]
            input=agent_input,
            session_id=session.session_id,
            dependencies=run_context.dependencies,
            session_state=run_context.session_state,
            stream=stream,
        )  # type: ignore

        # Check if the agent called the workflow tool
        workflow_executed = False
        if agent_response.messages:
            for message in agent_response.messages:
                if message.role == "assistant" and message.tool_calls:
                    # Check if the tool call is specifically for run_workflow
                    for tool_call in message.tool_calls:
                        # Handle both dict and object formats
                        if isinstance(tool_call, dict):
                            tool_name = tool_call.get("function", {}).get("name", "")
                        else:
                            tool_name = tool_call.function.name if hasattr(tool_call, "function") else ""

                        if tool_name == "run_workflow":
                            workflow_executed = True
                            break
                    if workflow_executed:
                        break

        # Handle direct answer case (no workflow execution)
        if not workflow_executed:
            # Create a new workflow run output for the direct answer
            run_id = str(uuid4())
            workflow_run_response = WorkflowRunOutput(
                run_id=run_id,
                input=execution_input.input,
                session_id=session.session_id,
                workflow_id=self.id,
                workflow_name=self.name,
                created_at=int(datetime.now().timestamp()),
                content=agent_response.content,
                status=RunStatus.completed,
                workflow_agent_run=agent_response,
            )

            # Store the full agent RunOutput and establish parent-child relationship
            if agent_response:
                agent_response.parent_run_id = workflow_run_response.run_id
                agent_response.workflow_id = workflow_run_response.workflow_id

            # Update the run in session
            session.upsert_run(run=workflow_run_response)
            if self._has_async_db():
                await self.asave_session(session=session)
            else:
                self.save_session(session=session)

            log_debug(f"Agent decision: workflow_executed={workflow_executed}")

            return workflow_run_response
        else:
            # Workflow was executed by the tool
            logger.info("=" * 80)
            logger.info("WORKFLOW AGENT: Called run_workflow tool (async)")
            logger.info("  ➜ Workflow was executed, retrieving results...")
            logger.info("=" * 80)

            log_debug("Reloading session from database to get the latest workflow run...")
            if self._has_async_db():
                reloaded_session = await self.aget_session(session_id=session.session_id)
            else:
                reloaded_session = self.get_session(session_id=session.session_id)

            if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
                # Get the last run (which is the one just created by the tool)
                last_run = reloaded_session.runs[-1]
                log_debug(f"Retrieved latest workflow run: {last_run.run_id}")
                log_debug(f"Total workflow runs in session: {len(reloaded_session.runs)}")

                # Update the last run with workflow_agent_run
                last_run.workflow_agent_run = agent_response

                # Store the full agent RunOutput and establish parent-child relationship
                if agent_response:
                    agent_response.parent_run_id = last_run.run_id
                    agent_response.workflow_id = last_run.workflow_id

                # Save the reloaded session (which has the updated run)
                if self._has_async_db():
                    await self.asave_session(session=reloaded_session)
                else:
                    self.save_session(session=reloaded_session)

                log_debug(f"Agent decision: workflow_executed={workflow_executed}")

                # Return the last run directly (WRO2 from inner workflow)
                return last_run
            else:
                log_warning("Could not reload session or no runs found after workflow execution")
                # Return a placeholder error response
                return WorkflowRunOutput(
                    run_id=str(uuid4()),
                    input=execution_input.input,
                    session_id=session.session_id,
                    workflow_id=self.id,
                    workflow_name=self.name,
                    created_at=int(datetime.now().timestamp()),
                    content="Error: Workflow execution failed",
                    status=RunStatus.error,
                )

    def cancel_run(self, run_id: str) -> bool:
        """Cancel a running workflow execution.

        Args:
            run_id (str): The run_id to cancel.

        Returns:
            bool: True if the run was found and marked for cancellation, False otherwise.
        """
        return cancel_run_global(run_id)

    @overload
    def run(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: Literal[False] = False,
        stream_events: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        background: Optional[bool] = False,
        background_tasks: Optional[Any] = None,
    ) -> WorkflowRunOutput: ...

    @overload
    def run(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: Literal[True] = True,
        stream_events: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        background: Optional[bool] = False,
        background_tasks: Optional[Any] = None,
    ) -> Iterator[WorkflowRunOutputEvent]: ...

    def run(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: bool = False,
        stream_events: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        background: Optional[bool] = False,
        background_tasks: Optional[Any] = None,
        **kwargs: Any,
    ) -> Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]:
        """Execute the workflow synchronously with optional streaming"""
        if self._has_async_db():
            raise Exception("`run()` is not supported with an async DB. Please use `arun()`.")

        # Create a run_id for this specific run and register immediately for cancellation tracking
        run_id = str(uuid4())
        register_run(run_id)

        input = self._validate_input(input)
        if background:
            raise RuntimeError("Background execution is not supported for sync run()")

        self._set_debug()

        self.initialize_workflow()
        session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)

        # Read existing session from database
        workflow_session = self.read_or_create_session(session_id=session_id, user_id=user_id)
        self._update_metadata(session=workflow_session)

        # Initialize session state
        session_state = self._initialize_session_state(
            session_state=session_state if session_state is not None else {},
            user_id=user_id,
            session_id=session_id,
            run_id=run_id,
        )
        # Update session state from DB
        session_state = self._load_session_state(session=workflow_session, session_state=session_state)

        log_debug(f"Workflow Run Start: {self.name}", center=True)

        # Use simple defaults
        stream = stream or self.stream or False
        stream_events = (stream_events or stream_intermediate_steps) or (
            self.stream_events or self.stream_intermediate_steps
        )

        # Can't stream events if streaming is disabled
        if stream is False:
            stream_events = False

        log_debug(f"Stream: {stream}")
        log_debug(f"Total steps: {self._get_step_count()}")

        # Prepare steps
        self._prepare_steps()

        inputs = WorkflowExecutionInput(
            input=input,
            additional_data=additional_data,
            audio=audio,  # type: ignore
            images=images,  # type: ignore
            videos=videos,  # type: ignore
            files=files,  # type: ignore
        )
        log_debug(
            f"Created pipeline input with session state keys: {list(session_state.keys()) if session_state else 'None'}"
        )

        self.update_agents_and_teams_session_info()

        # Initialize run context
        run_context = RunContext(
            run_id=run_id,
            session_id=session_id,
            user_id=user_id,
            session_state=session_state,
        )

        # Execute workflow agent if configured
        if self.agent is not None:
            return self._execute_workflow_agent(
                user_input=input,  # type: ignore
                session=workflow_session,
                execution_input=inputs,
                run_context=run_context,
                stream=stream,
                **kwargs,
            )

        # Create workflow run response for regular workflow execution
        workflow_run_response = WorkflowRunOutput(
            run_id=run_id,
            input=input,
            session_id=session_id,
            workflow_id=self.id,
            workflow_name=self.name,
            created_at=int(datetime.now().timestamp()),
        )

        # Start the run metrics timer
        workflow_run_response.metrics = WorkflowMetrics(steps={})
        workflow_run_response.metrics.start_timer()

        if stream:
            return self._execute_stream(
                session=workflow_session,
                execution_input=inputs,  # type: ignore[arg-type]
                workflow_run_response=workflow_run_response,
                stream_events=stream_events,
                run_context=run_context,
                background_tasks=background_tasks,
                **kwargs,
            )
        else:
            return self._execute(
                session=workflow_session,
                execution_input=inputs,  # type: ignore[arg-type]
                workflow_run_response=workflow_run_response,
                run_context=run_context,
                background_tasks=background_tasks,
                **kwargs,
            )

    @overload
    async def arun(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: Literal[False] = False,
        stream_events: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        background: Optional[bool] = False,
        websocket: Optional[WebSocket] = None,
        background_tasks: Optional[Any] = None,
    ) -> WorkflowRunOutput: ...

    @overload
    def arun(
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: Literal[True] = True,
        stream_events: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        background: Optional[bool] = False,
        websocket: Optional[WebSocket] = None,
        background_tasks: Optional[Any] = None,
    ) -> AsyncIterator[WorkflowRunOutputEvent]: ...

    def arun(  # type: ignore
        self,
        input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: bool = False,
        stream_events: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = False,
        background: Optional[bool] = False,
        websocket: Optional[WebSocket] = None,
        background_tasks: Optional[Any] = None,
        **kwargs: Any,
    ) -> Union[WorkflowRunOutput, AsyncIterator[WorkflowRunOutputEvent]]:
        """Execute the workflow synchronously with optional streaming"""

        input = self._validate_input(input)

        websocket_handler = None
        if websocket:
            from agno.workflow.types import WebSocketHandler

            websocket_handler = WebSocketHandler(websocket=websocket)

        if background:
            if stream and websocket:
                # Consider both stream_events and stream_intermediate_steps (deprecated)
                if stream_intermediate_steps is not None:
                    warnings.warn(
                        "The 'stream_intermediate_steps' parameter is deprecated and will be removed in future versions. Use 'stream_events' instead.",
                        DeprecationWarning,
                        stacklevel=2,
                    )
                stream_events = stream_events or stream_intermediate_steps or False

                # Background + Streaming + WebSocket = Real-time events
                return self._arun_background_stream(  # type: ignore
                    input=input,
                    additional_data=additional_data,
                    user_id=user_id,
                    session_id=session_id,
                    session_state=session_state,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    stream_events=stream_events,
                    websocket_handler=websocket_handler,
                    **kwargs,
                )
            elif stream and not websocket:
                # Background + Streaming but no WebSocket = Not supported
                raise ValueError("Background streaming execution requires a WebSocket for real-time events")
            else:
                # Background + Non-streaming = Polling (existing)
                return self._arun_background(  # type: ignore
                    input=input,
                    additional_data=additional_data,
                    user_id=user_id,
                    session_id=session_id,
                    session_state=session_state,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    **kwargs,
                )

        self._set_debug()

        # Create a run_id for this specific run and register immediately for cancellation tracking
        run_id = str(uuid4())
        register_run(run_id)

        self.initialize_workflow()
        session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)

        # Initialize run context
        run_context = RunContext(
            run_id=run_id,
            session_id=session_id,
            user_id=user_id,
            session_state=session_state,
        )

        log_debug(f"Async Workflow Run Start: {self.name}", center=True)

        # Use simple defaults
        stream = stream or self.stream or False
        stream_events = (stream_events or stream_intermediate_steps) or (
            self.stream_events or self.stream_intermediate_steps
        )

        # Can't stream events if streaming is disabled
        if stream is False:
            stream_events = False

        log_debug(f"Stream: {stream}")

        # Prepare steps
        self._prepare_steps()

        inputs = WorkflowExecutionInput(
            input=input,
            additional_data=additional_data,
            audio=audio,  # type: ignore
            images=images,  # type: ignore
            videos=videos,  # type: ignore
            files=files,
        )
        log_debug(
            f"Created async pipeline input with session state keys: {list(session_state.keys()) if session_state else 'None'}"
        )

        self.update_agents_and_teams_session_info()

        if self.agent is not None:
            return self._aexecute_workflow_agent(  # type: ignore
                user_input=input,  # type: ignore
                execution_input=inputs,
                run_context=run_context,
                stream=stream,
                **kwargs,
            )

        # Create workflow run response for regular workflow execution
        workflow_run_response = WorkflowRunOutput(
            run_id=run_id,
            input=input,
            session_id=session_id,
            workflow_id=self.id,
            workflow_name=self.name,
            created_at=int(datetime.now().timestamp()),
        )

        # Start the run metrics timer
        workflow_run_response.metrics = WorkflowMetrics(steps={})
        workflow_run_response.metrics.start_timer()

        if stream:
            return self._aexecute_stream(  # type: ignore
                execution_input=inputs,
                workflow_run_response=workflow_run_response,
                session_id=session_id,
                user_id=user_id,
                stream_events=stream_events,
                websocket=websocket,
                files=files,
                session_state=session_state,
                run_context=run_context,
                background_tasks=background_tasks,
                **kwargs,
            )
        else:
            return self._aexecute(  # type: ignore
                execution_input=inputs,
                workflow_run_response=workflow_run_response,
                session_id=session_id,
                user_id=user_id,
                websocket=websocket,
                files=files,
                session_state=session_state,
                run_context=run_context,
                background_tasks=background_tasks,
                **kwargs,
            )

    def _prepare_steps(self):
        """Prepare the steps for execution"""
        if not callable(self.steps) and self.steps is not None:
            prepared_steps: List[Union[Step, Steps, Loop, Parallel, Condition, Router]] = []
            for i, step in enumerate(self.steps):  # type: ignore
                if callable(step) and hasattr(step, "__name__"):
                    step_name = step.__name__
                    log_debug(f"Step {i + 1}: Wrapping callable function '{step_name}'")
                    prepared_steps.append(Step(name=step_name, description="User-defined callable step", executor=step))  # type: ignore
                elif isinstance(step, Agent):
                    step_name = step.name or f"step_{i + 1}"
                    log_debug(f"Step {i + 1}: Agent '{step_name}'")
                    prepared_steps.append(Step(name=step_name, description=step.description, agent=step))
                elif isinstance(step, Team):
                    step_name = step.name or f"step_{i + 1}"
                    log_debug(f"Step {i + 1}: Team '{step_name}' with {len(step.members)} members")
                    prepared_steps.append(Step(name=step_name, description=step.description, team=step))
                elif isinstance(step, Step) and step.add_workflow_history is True and self.db is None:
                    log_warning(
                        f"Step '{step.name or f'step_{i + 1}'}' has add_workflow_history=True "
                        "but no database is configured in the Workflow. "
                        "History won't be persisted. Add a database to persist runs across executions."
                    )
                elif isinstance(step, (Step, Steps, Loop, Parallel, Condition, Router)):
                    step_type = type(step).__name__
                    step_name = getattr(step, "name", f"unnamed_{step_type.lower()}")
                    log_debug(f"Step {i + 1}: {step_type} '{step_name}'")
                    prepared_steps.append(step)
                else:
                    raise ValueError(f"Invalid step type: {type(step).__name__}")

            self.steps = prepared_steps  # type: ignore
            log_debug("Step preparation completed")

    def print_response(
        self,
        input: Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]],
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: Optional[bool] = None,
        markdown: bool = True,
        show_time: bool = True,
        show_step_details: bool = True,
        console: Optional[Any] = None,
        **kwargs: Any,
    ) -> None:
        """Print workflow execution with rich formatting and optional streaming

        Args:
            input: The main query/input for the workflow
            additional_data: Attached message data to the input
            user_id: User ID
            session_id: Session ID
            audio: Audio input
            images: Image input
            videos: Video input
            files: File input
            stream: Whether to stream the response content
            markdown: Whether to render content as markdown
            show_time: Whether to show execution time
            show_step_details: Whether to show individual step outputs
            console: Rich console instance (optional)
        """
        if self._has_async_db():
            raise Exception("`print_response()` is not supported with an async DB. Please use `aprint_response()`.")

        if stream is None:
            stream = self.stream or False

        if "stream_events" in kwargs:
            kwargs.pop("stream_events")

        if stream:
            print_response_stream(
                workflow=self,
                input=input,
                user_id=user_id,
                session_id=session_id,
                additional_data=additional_data,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                stream_events=True,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                console=console,
                **kwargs,
            )
        else:
            print_response(
                workflow=self,
                input=input,
                user_id=user_id,
                session_id=session_id,
                additional_data=additional_data,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                console=console,
                **kwargs,
            )

    async def aprint_response(
        self,
        input: Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]],
        additional_data: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        audio: Optional[List[Audio]] = None,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        files: Optional[List[File]] = None,
        stream: Optional[bool] = None,
        markdown: bool = True,
        show_time: bool = True,
        show_step_details: bool = True,
        console: Optional[Any] = None,
        **kwargs: Any,
    ) -> None:
        """Print workflow execution with rich formatting and optional streaming

        Args:
            input: The main message/input for the workflow
            additional_data: Attached message data to the input
            user_id: User ID
            session_id: Session ID
            audio: Audio input
            images: Image input
            videos: Video input
            files: Files input
            stream: Whether to stream the response content
            markdown: Whether to render content as markdown
            show_time: Whether to show execution time
            show_step_details: Whether to show individual step outputs
            console: Rich console instance (optional)
        """
        if stream is None:
            stream = self.stream or False

        if "stream_events" in kwargs:
            kwargs.pop("stream_events")

        if stream:
            await aprint_response_stream(
                workflow=self,
                input=input,
                additional_data=additional_data,
                user_id=user_id,
                session_id=session_id,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                stream_events=True,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                console=console,
                **kwargs,
            )
        else:
            await aprint_response(
                workflow=self,
                input=input,
                additional_data=additional_data,
                user_id=user_id,
                session_id=session_id,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                console=console,
                **kwargs,
            )

    def to_dict(self) -> Dict[str, Any]:
        """Convert workflow to dictionary representation"""

        def serialize_step(step):
            # Handle callable functions (not wrapped in Step objects)
            if callable(step) and hasattr(step, "__name__"):
                step_dict = {
                    "name": step.__name__,
                    "description": "User-defined callable step",
                    "type": StepType.STEP.value,
                }
                return step_dict

            # Handle Agent and Team objects directly
            if isinstance(step, Agent):
                step_dict = {
                    "name": step.name or "unnamed_agent",
                    "description": step.description or "Agent step",
                    "type": StepType.STEP.value,
                    "agent": step,
                }
                return step_dict

            if isinstance(step, Team):
                step_dict = {
                    "name": step.name or "unnamed_team",
                    "description": step.description or "Team step",
                    "type": StepType.STEP.value,
                    "team": step,
                }
                return step_dict

            step_dict = {
                "name": step.name if hasattr(step, "name") else f"unnamed_{type(step).__name__.lower()}",
                "description": step.description if hasattr(step, "description") else "User-defined callable step",
                "type": STEP_TYPE_MAPPING[type(step)].value,  # type: ignore
            }

            # Handle agent/team/tools
            if hasattr(step, "agent"):
                step_dict["agent"] = step.agent if hasattr(step, "agent") else None  # type: ignore
            if hasattr(step, "team"):
                step_dict["team"] = step.team if hasattr(step, "team") else None  # type: ignore

            # Handle nested steps for Router/Loop
            if isinstance(step, Router):
                step_dict["steps"] = (
                    [serialize_step(step) for step in step.choices] if hasattr(step, "choices") else None
                )

            elif isinstance(step, (Loop, Condition, Steps, Parallel)):
                step_dict["steps"] = [serialize_step(step) for step in step.steps] if hasattr(step, "steps") else None

            return step_dict

        if self.steps is None or callable(self.steps):
            steps_list = []
        elif isinstance(self.steps, Steps):
            steps_list = self.steps.steps
        else:
            steps_list = self.steps

        return {
            "name": self.name,
            "workflow_id": self.id,
            "description": self.description,
            "steps": [serialize_step(s) for s in steps_list],
            "session_id": self.session_id,
        }

    def _calculate_session_metrics_from_workflow_metrics(self, workflow_metrics: WorkflowMetrics) -> Metrics:
        """Calculate session metrics by aggregating all step metrics from workflow metrics"""
        session_metrics = Metrics()

        # Aggregate metrics from all steps
        for step_name, step_metrics in workflow_metrics.steps.items():
            if step_metrics.metrics:
                session_metrics += step_metrics.metrics

        session_metrics.time_to_first_token = None

        return session_metrics

    def _get_session_metrics(self, session: WorkflowSession) -> Metrics:
        """Get existing session metrics from the database"""
        if session.session_data and "session_metrics" in session.session_data:
            session_metrics_from_db = session.session_data.get("session_metrics")
            if session_metrics_from_db is not None:
                if isinstance(session_metrics_from_db, dict):
                    return Metrics(**session_metrics_from_db)
                elif isinstance(session_metrics_from_db, Metrics):
                    return session_metrics_from_db
        return Metrics()

    def _update_session_metrics(self, session: WorkflowSession, workflow_run_response: WorkflowRunOutput):
        """Calculate and update session metrics"""
        # Get existing session metrics
        session_metrics = self._get_session_metrics(session=session)

        # If workflow has metrics, convert and add them to session metrics
        if workflow_run_response.metrics:
            run_session_metrics = self._calculate_session_metrics_from_workflow_metrics(workflow_run_response.metrics)  # type: ignore[arg-type]

            session_metrics += run_session_metrics

        session_metrics.time_to_first_token = None

        # Store updated session metrics - CONVERT TO DICT FOR JSON SERIALIZATION
        if not session.session_data:
            session.session_data = {}
        session.session_data["session_metrics"] = session_metrics.to_dict()

    async def aget_session_metrics(self, session_id: Optional[str] = None) -> Optional[Metrics]:
        """Get the session metrics for the given session ID and user ID."""
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is required")

        session = await self.aget_session(session_id=session_id)  # type: ignore
        if session is None:
            raise Exception("Session not found")

        return self._get_session_metrics(session=session)

    def get_session_metrics(self, session_id: Optional[str] = None) -> Optional[Metrics]:
        """Get the session metrics for the given session ID and user ID."""
        session_id = session_id or self.session_id
        if session_id is None:
            raise Exception("Session ID is required")

        session = self.get_session(session_id=session_id)
        if session is None:
            raise Exception("Session not found")

        return self._get_session_metrics(session=session)

    def update_agents_and_teams_session_info(self):
        """Update agents and teams with workflow session information"""
        log_debug("Updating agents and teams with session information")
        # Initialize steps - only if steps is iterable (not callable)
        if self.steps and not callable(self.steps):
            steps_list = self.steps.steps if isinstance(self.steps, Steps) else self.steps
            for step in steps_list:
                # TODO: Handle properly steps inside other primitives
                if isinstance(step, Step):
                    active_executor = step.active_executor

                    if hasattr(active_executor, "workflow_id"):
                        active_executor.workflow_id = self.id

                    # If it's a team, update all members
                    if hasattr(active_executor, "members"):
                        for member in active_executor.members:  # type: ignore
                            if hasattr(member, "workflow_id"):
                                member.workflow_id = self.id

    def propagate_run_hooks_in_background(self, run_in_background: bool = True) -> None:
        """
        Propagate _run_hooks_in_background setting to this workflow and all agents/teams in steps.

        This method sets _run_hooks_in_background on the workflow and all agents/teams
        within its steps, including nested teams and their members.

        Args:
            run_in_background: Whether hooks should run in background. Defaults to True.
        """
        self._run_hooks_in_background = run_in_background

        if not self.steps or callable(self.steps):
            return

        steps_list = self.steps.steps if isinstance(self.steps, Steps) else self.steps

        for step in steps_list:
            self._propagate_hooks_to_step(step, run_in_background)

    def _propagate_hooks_to_step(self, step: Any, run_in_background: bool) -> None:
        """Recursively propagate _run_hooks_in_background to a step and its nested content."""
        # Handle Step objects with active executor
        if hasattr(step, "active_executor") and step.active_executor:
            executor = step.active_executor
            # If it's a team, use its propagation method
            if hasattr(executor, "propagate_run_hooks_in_background"):
                executor.propagate_run_hooks_in_background(run_in_background)
            elif hasattr(executor, "_run_hooks_in_background"):
                executor._run_hooks_in_background = run_in_background

        # Handle agent/team directly on step
        if hasattr(step, "agent") and step.agent:
            if hasattr(step.agent, "_run_hooks_in_background"):
                step.agent._run_hooks_in_background = run_in_background
        if hasattr(step, "team") and step.team:
            # Use team's method to propagate to all nested members
            if hasattr(step.team, "propagate_run_hooks_in_background"):
                step.team.propagate_run_hooks_in_background(run_in_background)
            elif hasattr(step.team, "_run_hooks_in_background"):
                step.team._run_hooks_in_background = run_in_background

        # Handle nested primitives - check 'steps' and 'choices' attributes
        for attr_name in ["steps", "choices"]:
            if hasattr(step, attr_name):
                attr_value = getattr(step, attr_name)
                if attr_value and isinstance(attr_value, list):
                    for nested_step in attr_value:
                        self._propagate_hooks_to_step(nested_step, run_in_background)

    ###########################################################################
    # Telemetry functions
    ###########################################################################

    def _get_telemetry_data(self) -> Dict[str, Any]:
        """Get the telemetry data for the workflow"""
        return {
            "workflow_id": self.id,
            "db_type": self.db.__class__.__name__ if self.db else None,
            "has_input_schema": self.input_schema is not None,
        }

    def _log_workflow_telemetry(self, session_id: str, run_id: Optional[str] = None) -> None:
        """Send a telemetry event to the API for a created Workflow run"""

        self._set_telemetry()
        if not self.telemetry:
            return

        from agno.api.workflow import WorkflowRunCreate, create_workflow_run

        try:
            create_workflow_run(
                workflow=WorkflowRunCreate(session_id=session_id, run_id=run_id, data=self._get_telemetry_data()),
            )
        except Exception as e:
            log_debug(f"Could not create Workflow run telemetry event: {e}")

    async def _alog_workflow_telemetry(self, session_id: str, run_id: Optional[str] = None) -> None:
        """Send a telemetry event to the API for a created Workflow async run"""

        self._set_telemetry()
        if not self.telemetry:
            return

        from agno.api.workflow import WorkflowRunCreate, acreate_workflow_run

        try:
            await acreate_workflow_run(
                workflow=WorkflowRunCreate(session_id=session_id, run_id=run_id, data=self._get_telemetry_data())
            )
        except Exception as e:
            log_debug(f"Could not create Workflow run telemetry event: {e}")

    def cli_app(
        self,
        input: Optional[str] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        user: str = "User",
        emoji: str = ":technologist:",
        stream: Optional[bool] = None,
        markdown: bool = True,
        show_time: bool = True,
        show_step_details: bool = True,
        exit_on: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        """
        Run an interactive command-line interface to interact with the workflow.

        This method creates a CLI interface that allows users to interact with the workflow
        either by providing a single input or through continuous interactive prompts.

        Arguments:
            input: Optional initial input to process before starting interactive mode.
            session_id: Optional session identifier for maintaining conversation context.
            user_id: Optional user identifier for tracking user-specific data.
            user: Display name for the user in the CLI prompt. Defaults to "User".
            emoji: Emoji to display next to the user name in prompts. Defaults to ":technologist:".
            stream: Whether to stream the workflow response. If None, uses workflow default.
            markdown: Whether to render output as markdown. Defaults to True.
            show_time: Whether to display timestamps in the output. Defaults to True.
            show_step_details: Whether to show detailed step information. Defaults to True.
            exit_on: List of commands that will exit the CLI. Defaults to ["exit", "quit", "bye", "stop"].
            **kwargs: Additional keyword arguments passed to the workflow's print_response method.

        Returns:
            None: This method runs interactively and does not return a value.
        """

        from rich.prompt import Prompt

        if input:
            self.print_response(
                input=input,
                stream=stream,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                user_id=user_id,
                session_id=session_id,
                **kwargs,
            )

        _exit_on = exit_on or ["exit", "quit", "bye", "stop"]
        while True:
            message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
            if message in _exit_on:
                break

            self.print_response(
                input=message,
                stream=stream,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                user_id=user_id,
                session_id=session_id,
                **kwargs,
            )

    async def acli_app(
        self,
        input: Optional[str] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        user: str = "User",
        emoji: str = ":technologist:",
        stream: Optional[bool] = None,
        markdown: bool = True,
        show_time: bool = True,
        show_step_details: bool = True,
        exit_on: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        """
        Run an interactive command-line interface to interact with the workflow.

        This method creates a CLI interface that allows users to interact with the workflow
        either by providing a single input or through continuous interactive prompts.

        Arguments:
            input: Optional initial input to process before starting interactive mode.
            session_id: Optional session identifier for maintaining conversation context.
            user_id: Optional user identifier for tracking user-specific data.
            user: Display name for the user in the CLI prompt. Defaults to "User".
            emoji: Emoji to display next to the user name in prompts. Defaults to ":technologist:".
            stream: Whether to stream the workflow response. If None, uses workflow default.
            markdown: Whether to render output as markdown. Defaults to True.
            show_time: Whether to display timestamps in the output. Defaults to True.
            show_step_details: Whether to show detailed step information. Defaults to True.
            exit_on: List of commands that will exit the CLI. Defaults to ["exit", "quit", "bye", "stop"].
            **kwargs: Additional keyword arguments passed to the workflow's print_response method.

        Returns:
            None: This method runs interactively and does not return a value.
        """

        from rich.prompt import Prompt

        if input:
            await self.aprint_response(
                input=input,
                stream=stream,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                user_id=user_id,
                session_id=session_id,
                **kwargs,
            )

        _exit_on = exit_on or ["exit", "quit", "bye", "stop"]
        while True:
            message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
            if message in _exit_on:
                break

            await self.aprint_response(
                input=message,
                stream=stream,
                markdown=markdown,
                show_time=show_time,
                show_step_details=show_step_details,
                user_id=user_id,
                session_id=session_id,
                **kwargs,
            )
